-
Notifications
You must be signed in to change notification settings - Fork 324
ConfigProvider iterates over all sources and reports all non-null values to telemetry #9404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
08c80b4
c62688b
125744a
8becbc5
db60faf
750ba64
3006e7c
bee84bf
bfcfd7e
b411ea9
d097300
1ca7fd9
81db67d
1b81e8a
9b22670
cdf3a12
7053f6c
ff95cdc
4094c53
1d70667
bdab76d
2482933
bc64730
11b61f1
d7dc744
700f837
b190961
779551d
94f6485
e796896
3997f56
9794631
84ab423
a9c1f7a
ff39a53
0408e49
e3b307d
81187aa
0ffe413
b5e1414
b048c91
0ef089d
7ef2a9f
7508fc7
49d3ced
fe9b376
d9dac26
474cd40
8e29e03
365f5d6
4a91a8e
d9fe333
2626c62
5b5ad0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,14 @@ | ||
| package datadog.trace.api; | ||
|
|
||
| import static datadog.trace.api.ConfigOrigin.DEFAULT; | ||
| import static datadog.trace.api.ConfigOrigin.REMOTE; | ||
| import static datadog.trace.api.ConfigSetting.ABSENT_SEQ_ID; | ||
| import static datadog.trace.api.ConfigSetting.DEFAULT_SEQ_ID; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; | ||
|
|
||
| /** | ||
|
|
@@ -16,47 +22,81 @@ public class ConfigCollector { | |
| private static final AtomicReferenceFieldUpdater<ConfigCollector, Map> COLLECTED_UPDATER = | ||
| AtomicReferenceFieldUpdater.newUpdater(ConfigCollector.class, Map.class, "collected"); | ||
|
|
||
| private volatile Map<String, ConfigSetting> collected = new ConcurrentHashMap<>(); | ||
| private volatile Map<ConfigOrigin, Map<String, ConfigSetting>> collected = | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just out of curiosity, why is the map ConfigOrigin -> Map<String, ConfigSetting>? would it be more performant for look ups if it was the other way around? Map<ConfigName, Map<ConfigOrigin, ConfigValue>>?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reduces the number of nested maps from O(# of configs) to O(# of origins).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like ConfigCollector is a singleton and we're keeping all the configuration information in place the entire time. That could significantly increasing our static overhead which is a bit worrisome. I think I'd prefer to see us just recompute the information as needed in the telemetry thread instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mhlidd ^ I agree with @dougqh's comment. Most of this information is really only needed for the app-started event on tracer startup, no need to hold it in memory it can just be dropped. The only time telemetry is reported again is infrequently during app-client-configuration events which is mainly just remote config & in that scenario no need to read all sources again you only need to report the remote-config source and value with the current global seqId and the intake will be able to know what is currently active in the tracer.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dougqh Is your concern the fact that we are keeping telemetry data throughout the entire tracer lifecycle? The telemetry client will call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the same pattern as the previous implementation - config gets collected for reporting (but not processed) until the next telemetry cycle. At that point the collected map gets atomically swapped out, so that fresh config can be collected in a new map while telemetry reports the previous batch in the background. That way we avoid contention from having publishing and consuming threads work on the same map at the same time. Eventually both sides end up with an empty map. Also as for putting origin vs name first in the mapping - as Matt says, origin has very low cardinality, so that way we end up with only a few sub-maps. Whereas name has high cardinality, putting that first would lead to many tiny maps - each of which would have a lot of overhead compared to the data they contain. Note we process the collected config data in the background on a separate thread, so any performance impact during processing won't affect application latency. When collecting it doesn't affect performance to put origin first. |
||
| new ConcurrentHashMap<>(); | ||
|
|
||
| private volatile Map<String, AtomicInteger> highestSeqId = new ConcurrentHashMap<>(); | ||
|
|
||
| public static ConfigCollector get() { | ||
| return INSTANCE; | ||
| } | ||
|
|
||
| // There are no non-test usages of this function | ||
| public void put(String key, Object value, ConfigOrigin origin) { | ||
| ConfigSetting setting = ConfigSetting.of(key, value, origin); | ||
| collected.put(key, setting); | ||
| put(key, value, origin, ABSENT_SEQ_ID, null); | ||
| } | ||
|
|
||
| public void putRemoteConfig(String key, Object value) { | ||
| int remoteSeqId = | ||
| highestSeqId.containsKey(key) ? highestSeqId.get(key).get() + 1 : DEFAULT_SEQ_ID + 1; | ||
| put(key, value, REMOTE, remoteSeqId, null); | ||
| } | ||
mhlidd marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| public void put(String key, Object value, ConfigOrigin origin, int seqId) { | ||
mhlidd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| put(key, value, origin, seqId, null); | ||
| } | ||
|
|
||
| // There are no usages of this function | ||
| public void put(String key, Object value, ConfigOrigin origin, String configId) { | ||
|
||
| ConfigSetting setting = ConfigSetting.of(key, value, origin, configId); | ||
| collected.put(key, setting); | ||
| put(key, value, origin, ABSENT_SEQ_ID, configId); | ||
| } | ||
|
|
||
| public void putAll(Map<String, Object> keysAndValues, ConfigOrigin origin) { | ||
| // attempt merge+replace to avoid collector seeing partial update | ||
| Map<String, ConfigSetting> merged = | ||
| new ConcurrentHashMap<>(keysAndValues.size() + collected.size()); | ||
| for (Map.Entry<String, Object> entry : keysAndValues.entrySet()) { | ||
| ConfigSetting setting = ConfigSetting.of(entry.getKey(), entry.getValue(), origin); | ||
| merged.put(entry.getKey(), setting); | ||
| public void put(String key, Object value, ConfigOrigin origin, int seqId, String configId) { | ||
mhlidd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ConfigSetting setting = ConfigSetting.of(key, value, origin, seqId, configId); | ||
| Map<String, ConfigSetting> configMap = | ||
| collected.computeIfAbsent(origin, k -> new ConcurrentHashMap<>()); | ||
| configMap.put(key, setting); // replaces any previous value for this key at origin | ||
| highestSeqId.computeIfAbsent(key, k -> new AtomicInteger()).set(seqId); | ||
| } | ||
|
|
||
| // put method specifically for DEFAULT origins. We don't allow overrides for configs from DEFAULT | ||
| // origins | ||
| public void putDefault(String key, Object value) { | ||
| ConfigSetting setting = ConfigSetting.of(key, value, DEFAULT, DEFAULT_SEQ_ID); | ||
khanayan123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Map<String, ConfigSetting> configMap = | ||
| collected.computeIfAbsent(DEFAULT, k -> new ConcurrentHashMap<>()); | ||
| if (!configMap.containsKey(key) || configMap.get(key).value == null) { | ||
| configMap.put(key, setting); | ||
| } | ||
| while (true) { | ||
| Map<String, ConfigSetting> current = collected; | ||
| current.forEach(merged::putIfAbsent); | ||
| if (COLLECTED_UPDATER.compareAndSet(this, current, merged)) { | ||
| break; // success | ||
| } | ||
| // roll back to original update before next attempt | ||
| merged.keySet().retainAll(keysAndValues.keySet()); | ||
| } | ||
|
|
||
| public void putRemoteConfigPayload(Map<String, Object> keysAndValues, ConfigOrigin origin) { | ||
|
||
| for (Map.Entry<String, Object> entry : keysAndValues.entrySet()) { | ||
| putRemoteConfig(entry.getKey(), entry.getValue()); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public Map<String, ConfigSetting> collect() { | ||
| public Map<ConfigOrigin, Map<String, ConfigSetting>> collect() { | ||
| if (!collected.isEmpty()) { | ||
| return COLLECTED_UPDATER.getAndSet(this, new ConcurrentHashMap<>()); | ||
| } else { | ||
| return Collections.emptyMap(); | ||
| } | ||
| } | ||
|
|
||
| // NOTE: Only used to preserve legacy behavior for with smoke tests | ||
| public static ConfigSetting getAppliedConfigSetting( | ||
| String key, Map<ConfigOrigin, Map<String, ConfigSetting>> configMap) { | ||
| ConfigSetting best = null; | ||
| for (Map<String, ConfigSetting> originConfigMap : configMap.values()) { | ||
| ConfigSetting setting = originConfigMap.get(key); | ||
| if (setting != null) { | ||
| if (best == null || setting.seqId > best.seqId) { | ||
| best = setting; | ||
| } | ||
| } | ||
| } | ||
| return best; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,11 @@ public final class ConfigSetting { | |
| public final String key; | ||
| public final Object value; | ||
| public final ConfigOrigin origin; | ||
|
|
||
| public final int seqId; | ||
| public static final int DEFAULT_SEQ_ID = 1; | ||
| public static final int ABSENT_SEQ_ID = 0; | ||
|
||
|
|
||
| /** The config ID associated with this setting, or {@code null} if not applicable. */ | ||
| public final String configId; | ||
|
|
||
|
|
@@ -19,17 +24,27 @@ public final class ConfigSetting { | |
| Arrays.asList("DD_API_KEY", "dd.api-key", "dd.profiling.api-key", "dd.profiling.apikey")); | ||
|
|
||
| public static ConfigSetting of(String key, Object value, ConfigOrigin origin) { | ||
| return new ConfigSetting(key, value, origin, null); | ||
| return new ConfigSetting(key, value, origin, ABSENT_SEQ_ID, null); | ||
| } | ||
|
|
||
| public static ConfigSetting of(String key, Object value, ConfigOrigin origin, int seqId) { | ||
| return new ConfigSetting(key, value, origin, seqId, null); | ||
| } | ||
|
|
||
| public static ConfigSetting of(String key, Object value, ConfigOrigin origin, String configId) { | ||
| return new ConfigSetting(key, value, origin, configId); | ||
| return new ConfigSetting(key, value, origin, ABSENT_SEQ_ID, configId); | ||
| } | ||
|
|
||
| public static ConfigSetting of( | ||
| String key, Object value, ConfigOrigin origin, int seqId, String configId) { | ||
| return new ConfigSetting(key, value, origin, seqId, configId); | ||
| } | ||
|
|
||
| private ConfigSetting(String key, Object value, ConfigOrigin origin, String configId) { | ||
| private ConfigSetting(String key, Object value, ConfigOrigin origin, int seqId, String configId) { | ||
| this.key = key; | ||
| this.value = CONFIG_FILTER_LIST.contains(key) ? "<hidden>" : value; | ||
| this.origin = origin; | ||
| this.seqId = seqId; | ||
| this.configId = configId; | ||
| } | ||
|
|
||
|
|
@@ -109,12 +124,13 @@ public boolean equals(Object o) { | |
| return key.equals(that.key) | ||
| && Objects.equals(value, that.value) | ||
| && origin == that.origin | ||
| && seqId == that.seqId | ||
| && Objects.equals(configId, that.configId); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(key, value, origin, configId); | ||
| return Objects.hash(key, value, origin, seqId, configId); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -127,6 +143,8 @@ public String toString() { | |
| + stringValue() | ||
| + ", origin=" | ||
| + origin | ||
| + ", seqId=" | ||
| + seqId | ||
| + ", configId=" | ||
| + configId | ||
| + '}'; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking into ways to remove this call, as smoke tests shouldn't be relying on internal-api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fe9b376