Skip to content

Commit 1dedf2c

Browse files
committed
feature:support multiple subscribers for same config file
1 parent b4ac3fb commit 1dedf2c

File tree

8 files changed

+256
-59
lines changed

8 files changed

+256
-59
lines changed

capa-spi-aws-config/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<artifactId>capa-aws-parent</artifactId>
2525
<groupId>group.rxcloud</groupId>
26-
<version>${revision}</version>
26+
<version>1.0.1.RELEASE</version>
2727
</parent>
2828

2929
<artifactId>capa-spi-aws-config</artifactId>

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfiguration.java

Lines changed: 163 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Objects;
43+
import java.util.Optional;
4344
import java.util.UUID;
4445
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.ExecutionException;
@@ -82,6 +83,17 @@ public AwsCapaConfiguration(CapaObjectSerializer objectSerializer) {
8283
serializerProcessor = new SerializerProcessor(objectSerializer);
8384
}
8485

86+
@Override
87+
public String stopSubscribe() {
88+
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler.dispose();
89+
return "success";
90+
}
91+
92+
@Override
93+
public void close() {
94+
//no need
95+
}
96+
8597
@Override
8698
protected void doInit(StoreConfig storeConfig) {
8799
//no need
@@ -93,10 +105,8 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
93105
if (CollectionUtils.isNullOrEmpty(keys)) {
94106
return Mono.error(new IllegalArgumentException("keys is null or empty"));
95107
}
96-
97108
//todo:need to get the specific env from system properties
98109
String applicationName = appId + "_FAT";
99-
100110
String configurationName = keys.get(0);
101111
String clientConfigurationVersion = getCurVersion(applicationName, configurationName);
102112

@@ -116,7 +126,10 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
116126
items.add((ConfigurationItem<T>) getCurConfigurationItem(applicationName, configurationName));
117127
} else {
118128
//if version changes,update versionMap and return
119-
items.add(updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion()));
129+
Configuration<T> tConfiguration = updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
130+
if (tConfiguration != null) {
131+
items.add(tConfiguration.getConfigurationItem());
132+
}
120133
}
121134
return items;
122135
});
@@ -126,12 +139,20 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
126139
protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, String label, List<String> keys, Map<String, String> metadata, TypeRef<T> type) {
127140
//todo:need to get the specific env from system properties
128141
String applicationName = appId + "_FAT";
129-
130142
String configurationName = keys.get(0);
131143

132-
return Flux.create(fluxSink -> {
133-
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
134-
.schedulePeriodically(() -> {
144+
initSubscribe(applicationName, configurationName, group, label, metadata, type);
145+
return doSub(applicationName, configurationName, group, label, metadata, type, appId);
146+
}
147+
148+
private synchronized <T> Mono<Boolean> initConfig(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
149+
//double check whether has been initialized
150+
if (isInitialized(applicationName, configurationName)) {
151+
return Mono.just(true);
152+
}
153+
return Mono.create(monoSink -> {
154+
AwsCapaConfigurationScheduler.INSTANCE.configInitScheduler
155+
.schedule(() -> {
135156
String version = getCurVersion(applicationName, configurationName);
136157

137158
GetConfigurationRequest request = GetConfigurationRequest.builder()
@@ -149,45 +170,90 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
149170
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
150171
}
151172
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
152-
/*
153-
the reason why not use publisher scheduler to update configuration item is that switch thread needs time,
154-
when the polling frequency is high,the second polling request may happens before the first request has been
155-
update successfully by publisher thread. In that case,the subscriber may receive several signals for one actual
156-
change event.
157-
*/
158-
ConfigurationItem<T> configurationItem = updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
159-
SubscribeResp<T> subscribeResp = convertToSubscribeResp(configurationItem, appId);
160-
fluxSink.next(subscribeResp);
173+
initConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
174+
monoSink.success(true);
161175
}
162-
}, 0, 1, TimeUnit.SECONDS);
176+
}, 0, TimeUnit.SECONDS);
163177
});
164178
}
165179

166-
private <T> SubscribeResp<T> convertToSubscribeResp(ConfigurationItem<T> item, String appId) {
167-
SubscribeResp<T> resp = new SubscribeResp<>();
168-
resp.setStoreName(AWS_APP_CONFIG_NAME);
169-
resp.setAppId(appId);
170-
resp.setItems(Lists.newArrayList(item));
171-
return resp;
180+
private <T> void initSubscribe(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
181+
if (!isInitialized(applicationName, configurationName)) {
182+
initConfig(applicationName, configurationName, group, label, metadata, type).block();
183+
}
184+
if (!isSubscribed(applicationName, configurationName)) {
185+
createSubscribe(applicationName, configurationName, type);
186+
}
172187
}
173188

174-
@Override
175-
public String stopSubscribe() {
176-
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler.dispose();
177-
return "success";
189+
private synchronized <T> void createSubscribe(String applicationName, String configurationName, TypeRef<T> type) {
190+
if (isSubscribed(applicationName, configurationName)) {
191+
return;
192+
}
193+
Flux.create(fluxSink -> {
194+
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
195+
.schedulePeriodically(() -> {
196+
String version = getCurVersion(applicationName, configurationName);
197+
198+
GetConfigurationRequest request = GetConfigurationRequest.builder()
199+
.application(applicationName)
200+
.clientId(UUID.randomUUID().toString())
201+
.configuration(configurationName)
202+
.clientConfigurationVersion(version)
203+
.environment(DEFAULT_ENV)
204+
.build();
205+
206+
GetConfigurationResponse resp = null;
207+
try {
208+
resp = appConfigAsyncClient.getConfiguration(request).get();
209+
} catch (InterruptedException | ExecutionException e) {
210+
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
211+
}
212+
//update subscribed status if needs
213+
getConfiguration(applicationName, configurationName).getSubscribed().compareAndSet(false, true);
214+
215+
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
216+
fluxSink.next(resp);
217+
}
218+
//todo:make the polling frequency configurable
219+
}, 0, 1, TimeUnit.SECONDS);
220+
})
221+
.publishOn(AwsCapaConfigurationScheduler.INSTANCE.configPublisherScheduler)
222+
.map(origin -> {
223+
GetConfigurationResponse resp = (GetConfigurationResponse) origin;
224+
Configuration configuration = updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
225+
return configuration == null ? Configuration.EMPTY : configuration;
226+
}).filter(resp -> resp != Configuration.EMPTY)
227+
.subscribe(resp -> {
228+
resp.triggers(resp.getConfigurationItem());
229+
});
178230
}
179231

180-
@Override
181-
public void close() {
182-
//no need
232+
private <T> Flux<SubscribeResp<T>> doSub(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type, String appId) {
233+
Configuration<?> configuration = getConfiguration(applicationName, configurationName);
234+
return Flux.create(fluxSink -> {
235+
configuration.addListener(configurationItem -> {
236+
fluxSink.next(configurationItem);
237+
});
238+
})
239+
.map(resp -> (ConfigurationItem<T>) resp)
240+
.map(resp -> convert(resp, appId));
241+
}
242+
243+
private <T> SubscribeResp<T> convert(ConfigurationItem<T> conf, String appId) {
244+
SubscribeResp<T> subscribeResp = new SubscribeResp<>();
245+
subscribeResp.setItems(Lists.newArrayList(conf));
246+
subscribeResp.setAppId(appId);
247+
subscribeResp.setStoreName(AWS_APP_CONFIG_NAME);
248+
return subscribeResp;
183249
}
184250

185251
/**
186252
* get current version
187253
* ps:version can be null
188254
*
189-
* @param applicationName
190-
* @param configurationName
255+
* @param applicationName applicationName
256+
* @param configurationName configurationName
191257
* @return current version
192258
*/
193259
private String getCurVersion(String applicationName, String configurationName) {
@@ -208,34 +274,81 @@ private ConfigurationItem<?> getCurConfigurationItem(String applicationName, Str
208274
return configurationItem;
209275
}
210276

211-
private synchronized <T> ConfigurationItem<T> updateConfigurationItem(String applicationName, String configurationName, TypeRef<T> type, SdkBytes contentSdkBytes, String version) {
277+
/**
278+
* @param applicationName applicationName
279+
* @param configurationName configurationName
280+
* @param type type of content
281+
* @param contentSdkBytes content value with SdkBytes type
282+
* @param version new version
283+
* @param <T> T
284+
* @return return the new value or null if not actually update
285+
*/
286+
private <T> Configuration<T> updateConfigurationItem(String applicationName, String configurationName, TypeRef<T> type, SdkBytes contentSdkBytes, String version) {
212287
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
213-
if (configMap == null) {
214-
configMap = new ConcurrentHashMap<>();
215-
T content = serializerProcessor.deserialize(contentSdkBytes, type, configurationName);
216288

217-
Configuration<T> configuration = new Configuration<>();
289+
//in fact,configMap.get(configurationName) is always not null, as it has been initialized in initialization process
290+
Configuration<T> configuration = (Configuration<T>) configMap.get(configurationName);
291+
292+
synchronized (configuration.lock) {
293+
//check whether content has been updated by other thread
294+
if (configMap.containsKey(configurationName) && Objects.equals(configMap.get(configurationName).getClientConfigurationVersion(), version)) {
295+
return null;
296+
}
297+
//do need to update
298+
T content = serializerProcessor.deserialize(contentSdkBytes, type, configurationName);
218299
configuration.setClientConfigurationVersion(version);
219300

220-
ConfigurationItem<T> configurationItem = new ConfigurationItem<>();
221-
configurationItem.setKey(configurationName);
301+
ConfigurationItem<T> configurationItem = Optional.ofNullable(configuration.getConfigurationItem()).orElse(new ConfigurationItem<>());
222302
configurationItem.setContent(content);
223303
configuration.setConfigurationItem(configurationItem);
224304

225305
configMap.put(configurationName, configuration);
306+
return configuration;
307+
}
308+
}
309+
310+
private <T> Configuration<T> initConfigurationItem(String applicationName, String configurationName, TypeRef<T> type, SdkBytes contentSdkBytes, String version) {
311+
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
312+
boolean initApplication = false;
313+
if (configMap == null) {
314+
configMap = new ConcurrentHashMap<>();
315+
initApplication = true;
316+
}
317+
318+
Configuration<T> configuration = new Configuration<>();
319+
configuration.setClientConfigurationVersion(version);
320+
configuration.getInitialized().compareAndSet(false, true);
321+
322+
ConfigurationItem<T> configurationItem = new ConfigurationItem<>();
323+
configurationItem.setKey(configurationName);
324+
T content = serializerProcessor.deserialize(contentSdkBytes, type, configurationName);
325+
configurationItem.setContent(content);
326+
configuration.setConfigurationItem(configurationItem);
327+
328+
configMap.put(configurationName, configuration);
329+
330+
if (initApplication) {
226331
versionMap.put(applicationName, configMap);
227-
return configurationItem;
228-
} else {
229-
T content = serializerProcessor.deserialize(contentSdkBytes, type, configurationName);
230-
Configuration<T> configuration = new Configuration<>();
231-
configuration.setClientConfigurationVersion(version);
332+
}
333+
return configuration;
334+
}
232335

233-
ConfigurationItem<T> configurationItem = new ConfigurationItem<>();
234-
configurationItem.setKey(configurationName);
235-
configurationItem.setContent(content);
236-
configuration.setConfigurationItem(configurationItem);
237-
configMap.put(configurationName, configuration);
238-
return configurationItem;
336+
private boolean isInitialized(String applicationName, String configurationName) {
337+
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
338+
return configMap != null && configMap.containsKey(configurationName) && configMap.get(configurationName).getInitialized().get();
339+
}
340+
341+
private boolean isSubscribed(String applicationName, String configurationName) {
342+
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
343+
return configMap != null && configMap.containsKey(configurationName) && configMap.get(configurationName).getInitialized().get() && configMap.get(configurationName).getSubscribed().get();
344+
}
345+
346+
private Configuration<?> getConfiguration(String applicationName, String configurationName) {
347+
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
348+
if (configMap != null && configMap.containsKey(configurationName)) {
349+
return configMap.get(configurationName);
239350
}
351+
return Configuration.EMPTY;
240352
}
353+
241354
}

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/entity/Configuration.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,58 @@
1717
package group.rxcloud.capa.spi.aws.config.entity;
1818

1919
import group.rxcloud.capa.component.configstore.ConfigurationItem;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.concurrent.CopyOnWriteArraySet;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2025

2126
/**
2227
* @author Reckless Xu
2328
*/
2429
public class Configuration<T> {
30+
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
31+
32+
public static final Configuration<Void> EMPTY = new Configuration<>();
33+
2534
private String clientConfigurationVersion;
2635

2736
private ConfigurationItem<T> configurationItem;
2837

38+
public final Object lock = new Object();
39+
40+
private final CopyOnWriteArraySet<ConfigurationListener<T>> listeners = new CopyOnWriteArraySet<>();
41+
42+
private AtomicBoolean initialized = new AtomicBoolean(false);
43+
44+
private AtomicBoolean subscribed = new AtomicBoolean(false);
45+
46+
public synchronized void addListener(ConfigurationListener<T> listener) {
47+
if (initialized.get()) {
48+
trigger(listener, configurationItem);
49+
}
50+
listeners.add(listener);
51+
}
52+
53+
public boolean triggers(ConfigurationItem<T> data) {
54+
boolean result = true;
55+
for (ConfigurationListener<T> listener : listeners) {
56+
if (!trigger(listener, data)) result = false;
57+
}
58+
return result;
59+
}
60+
61+
private boolean trigger(ConfigurationListener<T> listener, ConfigurationItem<T> data) {
62+
try {
63+
listener.onLoad(data);
64+
return true;
65+
} catch (Exception e) {
66+
LOGGER.error("listener onLoad error,fileName:{},", data.getKey(), e);
67+
return false;
68+
}
69+
}
70+
71+
2972
public String getClientConfigurationVersion() {
3073
return clientConfigurationVersion;
3174
}
@@ -41,4 +84,12 @@ public ConfigurationItem<T> getConfigurationItem() {
4184
public void setConfigurationItem(ConfigurationItem<T> configurationItem) {
4285
this.configurationItem = configurationItem;
4386
}
87+
88+
public AtomicBoolean getInitialized() {
89+
return initialized;
90+
}
91+
92+
public AtomicBoolean getSubscribed() {
93+
return subscribed;
94+
}
4495
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.config.entity;
18+
19+
import group.rxcloud.capa.component.configstore.ConfigurationItem;
20+
21+
/**
22+
* @author Reckless Xu
23+
* 2021/11/22
24+
*/
25+
public interface ConfigurationListener<T> {
26+
void onLoad(ConfigurationItem<T> configurationItem);
27+
}

0 commit comments

Comments
 (0)