Skip to content

Commit fde6b5a

Browse files
author
chenyijiang
committed
Add merged properties config.
1 parent e40e0d0 commit fde6b5a

File tree

3 files changed

+130
-54
lines changed

3 files changed

+130
-54
lines changed

sdk-component/src/main/java/group/rxcloud/capa/component/telemetry/SamplerConfig.java

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,12 @@
1616
*/
1717
package group.rxcloud.capa.component.telemetry;
1818

19-
import com.google.common.collect.Lists;
2019
import group.rxcloud.capa.component.telemetry.metrics.CapaMeterProviderBuilder;
2120
import group.rxcloud.capa.infrastructure.CapaProperties;
22-
import group.rxcloud.capa.infrastructure.hook.ConfigurationHooks;
21+
import group.rxcloud.capa.infrastructure.hook.MergedPropertiesConfig;
2322
import group.rxcloud.capa.infrastructure.hook.Mixer;
24-
import group.rxcloud.cloudruntimes.domain.core.configuration.SubConfigurationResp;
25-
import group.rxcloud.cloudruntimes.utils.TypeRef;
26-
import org.apache.commons.collections.CollectionUtils;
27-
import org.apache.commons.lang3.StringUtils;
2823
import org.slf4j.Logger;
2924
import org.slf4j.LoggerFactory;
30-
import reactor.core.publisher.Flux;
3125

3226
import java.io.Serializable;
3327
import java.util.function.Supplier;
@@ -37,12 +31,6 @@
3731
*/
3832
public class SamplerConfig implements Serializable {
3933

40-
private static final long serialVersionUID = -2113523925814197551L;
41-
42-
public static final transient String FILE_PATH = "capa-component-telemetry-sample.properties";
43-
44-
public static final transient String COMMON_FILE_SUFFIX = "telemetry-common";
45-
4634
/**
4735
* Sample all data as default.
4836
*/
@@ -53,25 +41,44 @@ public class SamplerConfig implements Serializable {
5341

5442
public static final transient SamplerConfig CONFIG = new SamplerConfig();
5543

56-
public static final transient Supplier<SamplerConfig> DEFAULT_SUPPLIER = () -> {
57-
return CONFIG;
58-
};
44+
private static final long serialVersionUID = -2113523925814197551L;
5945

6046
private static final transient Logger log = LoggerFactory.getLogger(CapaMeterProviderBuilder.class);
6147

48+
public static transient Supplier<SamplerConfig> DEFAULT_SUPPLIER = () -> {
49+
return CONFIG;
50+
};
6251

6352
static {
6453
Mixer.configurationHooksNullable().ifPresent(hooks -> {
54+
55+
String fileName = "capa-component-telemetry-sample.properties";
56+
57+
String suffix = "telemetry-common";
58+
6559
try {
66-
subscribeConfiguration(hooks, hooks.defaultConfigurationAppId(), true);
67-
} catch (Throwable throwable) {
68-
log.warn("Fail to load global telemetry config. Dynamic global config is disabled for capa telemetry.",
69-
throwable);
70-
}
71-
try {
72-
subscribeConfiguration(hooks,
73-
CapaProperties.COMPONENT_PROPERTIES_SUPPLIER.apply(COMMON_FILE_SUFFIX).getProperty("appId"),
74-
false);
60+
MergedPropertiesConfig config = new MergedPropertiesConfig(fileName, hooks
61+
.defaultConfigurationAppId(),
62+
CapaProperties.COMPONENT_PROPERTIES_SUPPLIER.apply(suffix).getProperty("appId"));
63+
String metricKey = "metricsEnable";
64+
String traceKey = "traceEnable";
65+
SamplerConfig dynamicConfig = new SamplerConfig() {
66+
@Override
67+
public Boolean isMetricsEnable() {
68+
return !config.containsKey(metricKey) || Boolean.TRUE.toString()
69+
.equalsIgnoreCase(config.get(metricKey));
70+
}
71+
72+
@Override
73+
public Boolean isTraceEnable() {
74+
return !config.containsKey(traceKey) || Boolean.TRUE.toString()
75+
.equalsIgnoreCase(config.get(traceKey));
76+
}
77+
};
78+
79+
DEFAULT_SUPPLIER = () -> {
80+
return dynamicConfig;
81+
};
7582
} catch (Throwable throwable) {
7683
log.warn("Fail to load global telemetry config. Dynamic global config is disabled for capa telemetry.",
7784
throwable);
@@ -84,31 +91,6 @@ public class SamplerConfig implements Serializable {
8491

8592
private Boolean traceEnable;
8693

87-
private static void subscribeConfiguration(ConfigurationHooks configurationHooks, String appId, boolean prior) {
88-
String storeName = configurationHooks.registryStoreNames().get(0);
89-
Flux<SubConfigurationResp<SamplerConfig>> configFlux = configurationHooks.subscribeConfiguration(
90-
storeName,
91-
appId,
92-
Lists.newArrayList(FILE_PATH),
93-
null,
94-
StringUtils.EMPTY,
95-
StringUtils.EMPTY,
96-
TypeRef.get(SamplerConfig.class));
97-
configFlux.subscribe(resp -> {
98-
if (CollectionUtils.isNotEmpty(resp.getItems())) {
99-
SamplerConfig config = resp.getItems().get(0).getContent();
100-
if (config != null) {
101-
if (config.metricsEnable != null && (prior || CONFIG.metricsEnable == null)) {
102-
CONFIG.metricsEnable = config.metricsEnable;
103-
}
104-
if (config.traceEnable != null && (prior || CONFIG.traceEnable == null)) {
105-
CONFIG.traceEnable = config.traceEnable;
106-
}
107-
}
108-
}
109-
});
110-
}
111-
11294
public Boolean isMetricsEnable() {
11395
return metricsEnable == null ? DEFAULT_CONFIG.metricsEnable : metricsEnable;
11496
}

sdk-component/src/test/java/group/rxcloud/capa/component/telemetry/TestMixerProvider.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@
2727

2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.Properties;
3031

3132
/**
3233
* @author: chenyijiang
3334
* @date: 2021/12/2 12:39
3435
*/
3536
public class TestMixerProvider implements Mixer.MixerProvider {
3637

37-
SamplerConfig app = new SamplerConfig() {{setMetricsEnable(false);}};
38-
SamplerConfig global = new SamplerConfig() {{setTraceEnable(false); setMetricsEnable(true);}};
38+
Properties app = new Properties() {{put("metricsEnable", false);}};
39+
Properties global = new Properties() {{put("traceEnable", false); put("metricsEnable", true);}};
3940

4041

4142
private ConfigurationHooks configurationHooks = new ConfigurationHooks() {
@@ -52,7 +53,7 @@ public String defaultConfigurationAppId() {
5253
@Override
5354
public <T> Flux<SubConfigurationResp<T>> subscribeConfiguration(String storeName, String appId, List<String> keys, Map<String, String> metadata, String group, String label, TypeRef<T> type) {
5455

55-
if (type.getType() == SamplerConfig.class) {
56+
if (type.getType() == Properties.class) {
5657
if ("123".equals(appId)) {
5758
return Flux.just(getSubscribeResponse(global));
5859
}
@@ -65,7 +66,7 @@ public <T> Flux<SubConfigurationResp<T>> subscribeConfiguration(String storeName
6566
}
6667
};
6768

68-
private <T> SubConfigurationResp<T> getSubscribeResponse(SamplerConfig samplerConfig) {
69+
private <T> SubConfigurationResp<T> getSubscribeResponse(Properties samplerConfig) {
6970
SubConfigurationResp<T> subConfigurationResp = new SubConfigurationResp<>();
7071
ConfigurationItem item = new ConfigurationItem();
7172
item.setContent(samplerConfig);
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.infrastructure.hook;
18+
19+
import group.rxcloud.cloudruntimes.domain.core.configuration.SubConfigurationResp;
20+
import group.rxcloud.cloudruntimes.utils.TypeRef;
21+
import reactor.core.publisher.Flux;
22+
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Properties;
27+
import java.util.concurrent.atomic.AtomicReferenceArray;
28+
29+
/**
30+
* Config provider to merge multiple properties file which takes the input order as their priority.
31+
*/
32+
public class MergedPropertiesConfig {
33+
34+
private final String fileName;
35+
36+
private final AtomicReferenceArray<Properties> properties;
37+
38+
private final Object lock = new Object();
39+
40+
private volatile Map<String, String> merged;
41+
42+
public MergedPropertiesConfig(String fileName, String... appIds) {
43+
this.fileName = fileName;
44+
properties = new AtomicReferenceArray<>(appIds.length);
45+
merged = new HashMap<>();
46+
Mixer.configurationHooksNullable().ifPresent(hooks -> {
47+
for (int i = 0; i < appIds.length; i++) {
48+
subscribeConfigurationByAppId(hooks, appIds[i], i);
49+
}
50+
});
51+
}
52+
53+
public boolean containsKey(String key) {
54+
return merged.containsKey(key);
55+
}
56+
57+
public String get(String key) {
58+
return merged.get(key);
59+
}
60+
61+
private void subscribeConfigurationByAppId(ConfigurationHooks configurationHooks, String appId, int index) {
62+
String storeName = configurationHooks.registryStoreNames().get(0);
63+
64+
Flux<SubConfigurationResp<Properties>> configFlux = configurationHooks.subscribeConfiguration(
65+
storeName,
66+
appId,
67+
Collections.singletonList(fileName),
68+
null,
69+
"",
70+
"",
71+
TypeRef.get(Properties.class));
72+
73+
74+
configFlux.subscribe(resp -> {
75+
if (!resp.getItems().isEmpty()) {
76+
properties.set(index, resp.getItems().get(0).getContent());
77+
} else {
78+
properties.set(index, null);
79+
}
80+
81+
synchronized (lock) {
82+
Map<String, String> merged = new HashMap<>();
83+
for (int i = 0; i < properties.length(); i++) {
84+
Properties item = properties.get(i);
85+
if (item != null) {
86+
item.forEach((k, v) -> merged.putIfAbsent(String.valueOf(k), String.valueOf(v)));
87+
}
88+
}
89+
this.merged = merged;
90+
}
91+
});
92+
}
93+
}

0 commit comments

Comments
 (0)