Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.AbstractContextlessScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -112,7 +113,7 @@ public void validate(final TimeValue settingValue, final Map<Setting<?>, Object>
@Nullable
private volatile TimeValue failuresDefaultRetention;
/** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which
* are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised.
* are executed by {@link AbstractContextlessScopedSettings#applySettings(Settings)} which is synchronised.
*/
@Nullable
private volatile DataStreamGlobalRetention dataGlobalRetention;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public long getProjectsMarkedForDeletionGeneration() {
return projectsMarkedForDeletionGeneration;
}

// visible for testing
Set<ProjectId> knownProjects() {
public Set<ProjectId> knownProjects() {
return projectsEntries.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectStateRegistry;
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.ProjectScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -78,6 +81,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";

private final ClusterSettings clusterSettings;
private final ProjectScopedSettings projectScopedSettings;
private final ThreadPool threadPool;

private volatile TimeValue slowTaskLoggingThreshold;
Expand All @@ -103,8 +107,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterApplierService(String nodeName, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(nodeName, clusterSettings, new ProjectScopedSettings(), threadPool);
}

public ClusterApplierService(
String nodeName,
ClusterSettings clusterSettings,
ProjectScopedSettings projectScopedSettings,
ThreadPool threadPool
) {
this.clusterSettings = clusterSettings;
this.projectScopedSettings = projectScopedSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.nodeName = nodeName;
Expand Down Expand Up @@ -518,14 +532,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
connectToNodesAndWait(newClusterState);
}

// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}
applySettings(clusterChangedEvent, stopWatch);

logger.debug("apply cluster state with version {}", newClusterState.version());
callClusterStateAppliers(clusterChangedEvent, stopWatch);
Expand All @@ -538,6 +545,35 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
callClusterStateListeners(clusterChangedEvent, stopWatch);
}

private void applySettings(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false) {
if (clusterChangedEvent.metadataChanged()) {
logger.debug("applying settings from cluster state with version {}", clusterChangedEvent.state().version());
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
try (Releasable ignored = stopWatch.record("applying settings")) {
clusterSettings.applySettings(incomingSettings);
}
}

ProjectStateRegistry oldProjectStateRegistry = clusterChangedEvent.previousState()
.custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
ProjectStateRegistry newProjectStateRegistry = clusterChangedEvent.state()
.custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
if (oldProjectStateRegistry != newProjectStateRegistry) {
for (ProjectId projectId : newProjectStateRegistry.knownProjects()) {
Settings oldProjectSettings = oldProjectStateRegistry.getProjectSettings(projectId);
Settings newProjectSettings = newProjectStateRegistry.getProjectSettings(projectId);
if (newProjectSettings.equals(oldProjectSettings) == false) {
try (Releasable ignored = stopWatch.record("applying project settings")) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have project id here?

projectScopedSettings.applySettings(projectId, newProjectSettings);
}
}
}
}
}
}

protected void connectToNodesAndWait(ClusterState newClusterState) {
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
this(
settings,
clusterSettings,
new ProjectScopedSettings(settings, Collections.emptySet()),
new ProjectScopedSettings(Collections.emptySet()),
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterApplierService(
Node.NODE_NAME_SETTING.get(settings),
clusterSettings,
new ProjectScopedSettings(Collections.emptySet()),
threadPool
)
);
}

Expand All @@ -81,7 +86,7 @@ public ClusterService(
clusterSettings,
projectScopedSettings,
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), clusterSettings, projectScopedSettings, threadPool)
);
}

Expand All @@ -91,7 +96,7 @@ public ClusterService(
MasterService masterService,
ClusterApplierService clusterApplierService
) {
this(settings, clusterSettings, new ProjectScopedSettings(settings, Collections.emptySet()), masterService, clusterApplierService);
this(settings, clusterSettings, new ProjectScopedSettings(Collections.emptySet()), masterService, clusterApplierService);
}

public ClusterService(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.settings;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.TriConsumer;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractContextlessScopedSettings extends AbstractScopedSettings<Void> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this name, but didn't manage to find anything better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add Javadoc then? Since the name isn't especially informative.


protected final Settings settings;
private Settings lastSettingsApplied;

public AbstractContextlessScopedSettings(
Settings nodeSettings,
Settings scopeSettings,
AbstractContextlessScopedSettings other,
Logger logger
) {
super(other, logger);

this.settings = nodeSettings;
this.lastSettingsApplied = scopeSettings;
}

public AbstractContextlessScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Property scope) {
super(settingsSet, scope);

this.settings = settings;
this.lastSettingsApplied = Settings.EMPTY;
}

/**
* Validates the given settings by running it through all update listeners without applying it. This
* method will not change any settings but will fail if any of the settings can't be applied.
*/
public synchronized Settings validateUpdate(Settings settings) {
final Settings current = Settings.builder().put(this.settings).put(settings).build();
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
validateUpdate(current, previous);

return current;
}

/**
* Applies the given settings to all the settings consumers or to none of them. The settings
* will be merged with the node settings before they are applied while given settings override existing node
* settings.
* @param newSettings the settings to apply
* @return the unmerged applied settings
*/
public synchronized Settings applySettings(Settings newSettings) {
if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) {
// nothing changed in the settings, ignore
return newSettings;
}
final Settings current = Settings.builder().put(this.settings).put(newSettings).build();
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
executeSettingsUpdaters(null, current, previous);

return lastSettingsApplied = newSettings;
}

/**
* Returns the value for the given setting.
*/
public <T> T get(Setting<T> setting) {
if (setting.getProperties().contains(scope) == false) {
throw new IllegalArgumentException(
"settings scope doesn't match the setting scope [" + this.scope + "] not in [" + setting.getProperties() + "]"
);
}
if (get(setting.getKey()) == null) {
throw new IllegalArgumentException("setting " + setting.getKey() + " has not been registered");
}
return setting.get(this.lastSettingsApplied, settings);
}

private static <T, V> TriConsumer<Void, T, V> wrapIgnoringContext(BiConsumer<T, V> consumer) {
return (ctx, t, v) -> consumer.accept(t, v);
}

private static <V> BiConsumer<Void, V> wrapIgnoringContext(Consumer<V> consumer) {
return (ctx, v) -> consumer.accept(v);
}

public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer, Consumer<T> validator) {
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer) {
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer));
}

public synchronized void addSettingsUpdateConsumer(Consumer<Settings> consumer, List<? extends Setting<?>> settings) {
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings);
}

public synchronized void addSettingsUpdateConsumer(
Consumer<Settings> consumer,
List<? extends Setting<?>> settings,
Consumer<Settings> validator
) {
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings, validator);
}

public synchronized <T> void addAffixUpdateConsumer(
Setting.AffixSetting<T> setting,
BiConsumer<String, T> consumer,
BiConsumer<String, T> validator
) {
super.addAffixUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

public synchronized void addAffixGroupUpdateConsumer(List<Setting.AffixSetting<?>> settings, BiConsumer<String, Settings> consumer) {
super.addAffixGroupUpdateConsumer(settings, wrapIgnoringContext(consumer));
}

public synchronized <T> void addAffixMapUpdateConsumer(
Setting.AffixSetting<T> setting,
Consumer<Map<String, T>> consumer,
BiConsumer<String, T> validator
) {
super.addAffixMapUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
}

public synchronized <A, B> void addSettingsUpdateConsumer(
Setting<A> a,
Setting<B> b,
BiConsumer<A, B> consumer,
BiConsumer<A, B> validator
) {
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer), validator);
}

public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer));
}
}
Loading