Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.AbstractContextlessScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void validate(final TimeValue settingValue, final Map<Setting<?>, Object>
@Nullable
private volatile TimeValue failuresDefaultRetention;
/** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which
* are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised.
* are executed by {@link AbstractContextlessScopedSettings#applySettings(Settings)} which is synchronised.
*/
@Nullable
private volatile DataStreamGlobalRetention dataGlobalRetention;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public long getProjectsMarkedForDeletionGeneration() {
return projectsMarkedForDeletionGeneration;
}

// visible for testing
Set<ProjectId> knownProjects() {
public Set<ProjectId> knownProjects() {
return projectsEntries.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectStateRegistry;
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.ProjectScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -78,6 +81,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";

private final ClusterSettings clusterSettings;
private final ProjectScopedSettings projectScopedSettings;
private final ThreadPool threadPool;

private volatile TimeValue slowTaskLoggingThreshold;
Expand All @@ -103,8 +107,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterApplierService(String nodeName, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(nodeName, clusterSettings, new ProjectScopedSettings(), threadPool);
}

public ClusterApplierService(
String nodeName,
ClusterSettings clusterSettings,
ProjectScopedSettings projectScopedSettings,
ThreadPool threadPool
) {
this.clusterSettings = clusterSettings;
this.projectScopedSettings = projectScopedSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.nodeName = nodeName;
Expand Down Expand Up @@ -518,14 +532,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
connectToNodesAndWait(newClusterState);
}

// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}
applySettings(clusterChangedEvent, stopWatch);

logger.debug("apply cluster state with version {}", newClusterState.version());
callClusterStateAppliers(clusterChangedEvent, stopWatch);
Expand All @@ -538,6 +545,35 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
callClusterStateListeners(clusterChangedEvent, stopWatch);
}

private void applySettings(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false) {
if (clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", clusterChangedEvent.state().version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}

ProjectStateRegistry oldProjectStateRegistry = clusterChangedEvent.previousState()
.custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
ProjectStateRegistry newProjectStateRegistry = clusterChangedEvent.state()
.custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
if (oldProjectStateRegistry != newProjectStateRegistry) {
for (ProjectId projectId : newProjectStateRegistry.knownProjects()) {
Settings oldProjectSettings = oldProjectStateRegistry.getProjectSettings(projectId);
Settings newProjectSettings = newProjectStateRegistry.getProjectSettings(projectId);
if (newProjectSettings.equals(oldProjectSettings) == false) {
try (Releasable ignored = stopWatch.record("applying project settings")) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have project id here?

projectScopedSettings.applySettings(projectId, newProjectSettings);
}
}
}
}
}
}

protected void connectToNodesAndWait(ClusterState newClusterState) {
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new ProjectScopedSettings(settings, Collections.emptySet()),
new ProjectScopedSettings(Collections.emptySet()),
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterApplierService(
Node.NODE_NAME_SETTING.get(settings),
clusterSettings,
new ProjectScopedSettings(Collections.emptySet()),
threadPool
)
);
}

Expand All @@ -81,7 +86,7 @@ public ClusterService(
clusterSettings,
projectScopedSettings,
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), clusterSettings, projectScopedSettings, threadPool)
);
}

Expand All @@ -91,7 +96,7 @@ public ClusterService(
MasterService masterService,
ClusterApplierService clusterApplierService
) {
this(settings, clusterSettings, new ProjectScopedSettings(settings, Collections.emptySet()), masterService, clusterApplierService);
this(settings, clusterSettings, new ProjectScopedSettings(Collections.emptySet()), masterService, clusterApplierService);
}

public ClusterService(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.settings;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.TriConsumer;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractContextlessScopedSettings extends AbstractScopedSettings<Void> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name, but didn't manage to find anything better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add Javadoc then? Since the name isn't especially informative.


protected final Settings settings;
private Settings lastSettingsApplied;

public AbstractContextlessScopedSettings(
Settings nodeSettings,
Settings scopeSettings,
AbstractContextlessScopedSettings other,
Logger logger
) {
super(other, logger);

this.settings = nodeSettings;
this.lastSettingsApplied = scopeSettings;
}

public AbstractContextlessScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Property scope) {
super(settingsSet, scope);

this.settings = settings;
this.lastSettingsApplied = Settings.EMPTY;
}

/**
* Validates the given settings by running it through all update listeners without applying it. This
* method will not change any settings but will fail if any of the settings can't be applied.
*/
public synchronized Settings validateUpdate(Settings settings) {
final Settings current = Settings.builder().put(this.settings).put(settings).build();
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
validateUpdate(current, previous);

return current;
}

/**
* Applies the given settings to all the settings consumers or to none of them. The settings
* will be merged with the node settings before they are applied while given settings override existing node
* settings.
* @param newSettings the settings to apply
* @return the unmerged applied settings
*/
public synchronized Settings applySettings(Settings newSettings) {
if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) {
// nothing changed in the settings, ignore
return newSettings;
}
final Settings current = Settings.builder().put(this.settings).put(newSettings).build();
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
executeSettingsUpdaters(null, current, previous);

return lastSettingsApplied = newSettings;
}

/**
* Returns the value for the given setting.
*/
public <T> T get(Setting<T> setting) {
if (setting.getProperties().contains(scope) == false) {
throw new IllegalArgumentException(
"settings scope doesn't match the setting scope [" + this.scope + "] not in [" + setting.getProperties() + "]"
);
}
if (get(setting.getKey()) == null) {
throw new IllegalArgumentException("setting " + setting.getKey() + " has not been registered");
}
return setting.get(this.lastSettingsApplied, settings);
}

private static <T, V> TriConsumer<Void, T, V> wrapIgnoringContext(BiConsumer<T, V> consumer) {
return (ctx, t, v) -> consumer.accept(t, v);
}

private static <V> BiConsumer<Void, V> wrapIgnoringContext(Consumer<V> consumer) {
return (ctx, v) -> consumer.accept(v);
}

/**
* Adds a settings consumer with a predicate that is only evaluated at update time.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
* @param <T> The type of the setting's value.
* @param setting The setting for which the updates are to be handled.
* @param consumer A {@link BiConsumer} that will be executed with the updated setting value.
* @param validator an additional validator that is only applied to updates of this setting.
* This is useful to add additional validation to settings at runtime compared to at startup time.
*/
public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer, Consumer<T> validator) {
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

/**
* Adds a settings consumer.
* <p>
* Note: Only settings registered in {@link org.elasticsearch.cluster.ClusterModule} can be changed dynamically.
* </p>
*/
public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer) {
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer));
}

/**
* Adds a settings consumer that is only executed if any setting in the supplied list of settings is changed. In that case all the
* settings are specified in the argument are returned.
*
* Also automatically adds empty consumers for all settings in order to activate logging
*/
public synchronized void addSettingsUpdateConsumer(Consumer<Settings> consumer, List<? extends Setting<?>> settings) {
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings);
}

/**
* Adds a settings consumer that is only executed if any setting in the supplied list of settings is changed. In that case all the
* settings are specified in the argument are returned. The validator is run across all specified settings before the settings are
* applied.
*
* Also automatically adds empty consumers for all settings in order to activate logging
*/
public synchronized void addSettingsUpdateConsumer(
Consumer<Settings> consumer,
List<? extends Setting<?>> settings,
Consumer<Settings> validator
) {
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings, validator);
}

/**
* Adds a settings consumer for affix settings. Affix settings have a namespace associated to it that needs to be available to the
* consumer in order to be processed correctly.
*/
public synchronized <T> void addAffixUpdateConsumer(
Setting.AffixSetting<T> setting,
BiConsumer<String, T> consumer,
BiConsumer<String, T> validator
) {
super.addAffixUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

/**
* Adds a affix settings consumer that accepts the settings for a group of settings. The consumer is only
* notified if at least one of the settings change.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
*/
public synchronized void addAffixGroupUpdateConsumer(List<Setting.AffixSetting<?>> settings, BiConsumer<String, Settings> consumer) {
super.addAffixGroupUpdateConsumer(settings, wrapIgnoringContext(consumer));
}

/**
* Adds a settings consumer for affix settings. Affix settings have a namespace associated to it that needs to be available to the
* consumer in order to be processed correctly. This consumer will get a namespace to value map instead of each individual namespace
* and value as in {@link #addAffixUpdateConsumer(Setting.AffixSetting, BiConsumer, BiConsumer)}
*/
public synchronized <T> void addAffixMapUpdateConsumer(
Setting.AffixSetting<T> setting,
Consumer<Map<String, T>> consumer,
BiConsumer<String, T> validator
) {
super.addAffixMapUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

/**
* Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change
* and if the provided validator succeeded.
* <p>
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
* </p>
* This method registers a compound updater that is useful if two settings are depending on each other.
* The consumer is always provided with both values even if only one of the two changes.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(
Setting<A> a,
Setting<B> b,
BiConsumer<A, B> consumer,
BiConsumer<A, B> validator
) {
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer), validator);
}

/**
* Adds a settings consumer that accepts the values for two settings.
* See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details.
*/
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer));
}
}
Loading