Skip to content

Commit 959d7e8

Browse files
ES-11463 Components are notified to updates to per-project settings
1 parent c666679 commit 959d7e8

25 files changed

+711
-321
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void validate(final TimeValue settingValue, final Map<Setting<?>, Object>
112112
@Nullable
113113
private volatile TimeValue failuresDefaultRetention;
114114
/** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which
115-
* are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised.
115+
* are executed by {@link org.elasticsearch.common.settings.AbstractContextlessScopedSettings#applySettings(Settings)} which is synchronised.
116116
*/
117117
@Nullable
118118
private volatile DataStreamGlobalRetention dataGlobalRetention;

server/src/main/java/org/elasticsearch/cluster/project/ProjectStateRegistry.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ public long getProjectsMarkedForDeletionGeneration() {
175175
return projectsMarkedForDeletionGeneration;
176176
}
177177

178-
// visible for testing
179-
Set<ProjectId> knownProjects() {
178+
public Set<ProjectId> knownProjects() {
180179
return projectsEntries.keySet();
181180
}
182181

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import org.elasticsearch.cluster.NodeConnectionsService;
2222
import org.elasticsearch.cluster.TimeoutClusterStateListener;
2323
import org.elasticsearch.cluster.metadata.Metadata;
24+
import org.elasticsearch.cluster.metadata.ProjectId;
2425
import org.elasticsearch.cluster.node.DiscoveryNodes;
26+
import org.elasticsearch.cluster.project.ProjectStateRegistry;
2527
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
2628
import org.elasticsearch.common.Priority;
2729
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2830
import org.elasticsearch.common.settings.ClusterSettings;
31+
import org.elasticsearch.common.settings.ProjectScopedSettings;
2932
import org.elasticsearch.common.settings.Setting;
3033
import org.elasticsearch.common.settings.Settings;
3134
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -78,6 +81,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
7881
public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
7982

8083
private final ClusterSettings clusterSettings;
84+
private final ProjectScopedSettings projectScopedSettings;
8185
private final ThreadPool threadPool;
8286

8387
private volatile TimeValue slowTaskLoggingThreshold;
@@ -103,8 +107,13 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
103107

104108
private NodeConnectionsService nodeConnectionsService;
105109

106-
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
110+
public ClusterApplierService(String nodeName, ClusterSettings clusterSettings, ThreadPool threadPool) {
111+
this(nodeName, clusterSettings, new ProjectScopedSettings(), threadPool);
112+
}
113+
114+
public ClusterApplierService(String nodeName, ClusterSettings clusterSettings, ProjectScopedSettings projectScopedSettings, ThreadPool threadPool) {
107115
this.clusterSettings = clusterSettings;
116+
this.projectScopedSettings = projectScopedSettings;
108117
this.threadPool = threadPool;
109118
this.state = new AtomicReference<>();
110119
this.nodeName = nodeName;
@@ -518,14 +527,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
518527
connectToNodesAndWait(newClusterState);
519528
}
520529

521-
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
522-
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) {
523-
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
524-
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
525-
try (Releasable ignored = stopWatch.record("applying settings")) {
526-
clusterSettings.applySettings(incomingSettings);
527-
}
528-
}
530+
applySettings(clusterChangedEvent, stopWatch);
529531

530532
logger.debug("apply cluster state with version {}", newClusterState.version());
531533
callClusterStateAppliers(clusterChangedEvent, stopWatch);
@@ -538,6 +540,33 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu
538540
callClusterStateListeners(clusterChangedEvent, stopWatch);
539541
}
540542

543+
private void applySettings(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) {
544+
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
545+
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false) {
546+
if (clusterChangedEvent.metadataChanged()) {
547+
logger.debug("applying settings from cluster state with version {}", clusterChangedEvent.state().version());
548+
final Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
549+
try (Releasable ignored = stopWatch.record("applying settings")) {
550+
clusterSettings.applySettings(incomingSettings);
551+
}
552+
}
553+
554+
ProjectStateRegistry oldProjectStateRegistry = clusterChangedEvent.previousState().custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
555+
ProjectStateRegistry newProjectStateRegistry = clusterChangedEvent.state().custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY);
556+
if (oldProjectStateRegistry != newProjectStateRegistry) {
557+
for (ProjectId projectId : newProjectStateRegistry.knownProjects()) {
558+
Settings oldProjectSettings = oldProjectStateRegistry.getProjectSettings(projectId);
559+
Settings newProjectSettings = newProjectStateRegistry.getProjectSettings(projectId);
560+
if (newProjectSettings.equals(oldProjectSettings) == false) {
561+
try (Releasable ignored = stopWatch.record("applying project settings")) {
562+
projectScopedSettings.applySettings(projectId, newProjectSettings);
563+
}
564+
}
565+
}
566+
}
567+
}
568+
}
569+
541570
protected void connectToNodesAndWait(ClusterState newClusterState) {
542571
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
543572
final CountDownLatch countDownLatch = new CountDownLatch(1);

server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
6363
this(
6464
settings,
6565
clusterSettings,
66-
new ProjectScopedSettings(settings, Collections.emptySet()),
66+
new ProjectScopedSettings(Collections.emptySet()),
6767
new MasterService(settings, clusterSettings, threadPool, taskManager),
68-
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
68+
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), clusterSettings,
69+
new ProjectScopedSettings(Collections.emptySet()), threadPool)
6970
);
7071
}
7172

@@ -81,7 +82,7 @@ public ClusterService(
8182
clusterSettings,
8283
projectScopedSettings,
8384
new MasterService(settings, clusterSettings, threadPool, taskManager),
84-
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
85+
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), clusterSettings, projectScopedSettings, threadPool)
8586
);
8687
}
8788

@@ -91,7 +92,7 @@ public ClusterService(
9192
MasterService masterService,
9293
ClusterApplierService clusterApplierService
9394
) {
94-
this(settings, clusterSettings, new ProjectScopedSettings(settings, Collections.emptySet()), masterService, clusterApplierService);
95+
this(settings, clusterSettings, new ProjectScopedSettings(Collections.emptySet()), masterService, clusterApplierService);
9596
}
9697

9798
public ClusterService(
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.settings;
11+
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.common.TriConsumer;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.Set;
18+
import java.util.function.BiConsumer;
19+
import java.util.function.Consumer;
20+
21+
public abstract class AbstractContextlessScopedSettings extends AbstractScopedSettings<Void> {
22+
23+
protected final Settings settings;
24+
private Settings lastSettingsApplied;
25+
26+
public AbstractContextlessScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractContextlessScopedSettings other, Logger logger) {
27+
super(other, logger);
28+
29+
this.settings = nodeSettings;
30+
this.lastSettingsApplied = scopeSettings;
31+
}
32+
33+
public AbstractContextlessScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Property scope) {
34+
super(settingsSet, scope);
35+
36+
this.settings = settings;
37+
this.lastSettingsApplied = Settings.EMPTY;
38+
}
39+
40+
/**
41+
* Validates the given settings by running it through all update listeners without applying it. This
42+
* method will not change any settings but will fail if any of the settings can't be applied.
43+
*/
44+
public synchronized Settings validateUpdate(Settings settings) {
45+
final Settings current = Settings.builder().put(this.settings).put(settings).build();
46+
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
47+
validateUpdate(current, previous);
48+
49+
return current;
50+
}
51+
52+
/**
53+
* Applies the given settings to all the settings consumers or to none of them. The settings
54+
* will be merged with the node settings before they are applied while given settings override existing node
55+
* settings.
56+
* @param newSettings the settings to apply
57+
* @return the unmerged applied settings
58+
*/
59+
public synchronized Settings applySettings(Settings newSettings) {
60+
if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) {
61+
// nothing changed in the settings, ignore
62+
return newSettings;
63+
}
64+
final Settings current = Settings.builder().put(this.settings).put(newSettings).build();
65+
final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build();
66+
executeSettingsUpdaters(null, current, previous);
67+
68+
return lastSettingsApplied = newSettings;
69+
}
70+
71+
/**
72+
* Returns the value for the given setting.
73+
*/
74+
public <T> T get(Setting<T> setting) {
75+
if (setting.getProperties().contains(scope) == false) {
76+
throw new IllegalArgumentException(
77+
"settings scope doesn't match the setting scope [" + this.scope + "] not in [" + setting.getProperties() + "]"
78+
);
79+
}
80+
if (get(setting.getKey()) == null) {
81+
throw new IllegalArgumentException("setting " + setting.getKey() + " has not been registered");
82+
}
83+
return setting.get(this.lastSettingsApplied, settings);
84+
}
85+
86+
private static <T, V> TriConsumer<Void, T, V> wrapIgnoringContext(BiConsumer<T, V> consumer) {
87+
return (ctx, t, v) -> consumer.accept(t, v);
88+
}
89+
90+
private static <V> BiConsumer<Void, V> wrapIgnoringContext(Consumer<V> consumer) {
91+
return (ctx, v) -> consumer.accept(v);
92+
}
93+
94+
public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer, Consumer<T> validator) {
95+
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
96+
}
97+
98+
public synchronized <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer) {
99+
super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer));
100+
}
101+
102+
public synchronized void addSettingsUpdateConsumer(Consumer<Settings> consumer, List<? extends Setting<?>> settings) {
103+
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings);
104+
}
105+
106+
public synchronized void addSettingsUpdateConsumer(
107+
Consumer<Settings> consumer,
108+
List<? extends Setting<?>> settings,
109+
Consumer<Settings> validator
110+
) {
111+
super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings, validator);
112+
}
113+
114+
public synchronized <T> void addAffixUpdateConsumer(
115+
Setting.AffixSetting<T> setting,
116+
BiConsumer<String, T> consumer,
117+
BiConsumer<String, T> validator
118+
) {
119+
super.addAffixUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
120+
}
121+
122+
public synchronized void addAffixGroupUpdateConsumer(List<Setting.AffixSetting<?>> settings, BiConsumer<String, Settings> consumer) {
123+
super.addAffixGroupUpdateConsumer(settings, wrapIgnoringContext(consumer));
124+
}
125+
126+
public synchronized <T> void addAffixMapUpdateConsumer(
127+
Setting.AffixSetting<T> setting,
128+
Consumer<Map<String, T>> consumer,
129+
BiConsumer<String, T> validator
130+
) {
131+
super.addAffixMapUpdateConsumer(setting, wrapIgnoringContext(consumer), validator);
132+
}
133+
134+
public synchronized <A, B> void addSettingsUpdateConsumer(
135+
Setting<A> a,
136+
Setting<B> b,
137+
BiConsumer<A, B> consumer,
138+
BiConsumer<A, B> validator
139+
) {
140+
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer), validator);
141+
}
142+
143+
public synchronized <A, B> void addSettingsUpdateConsumer(
144+
Setting<A> a,
145+
Setting<B> b,
146+
BiConsumer<A, B> consumer
147+
) {
148+
super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer));
149+
}
150+
}

0 commit comments

Comments
 (0)