Skip to content

Commit a8619f7

Browse files
committed
refactor: refactor rpc and config settings
1 parent 08abec2 commit a8619f7

File tree

28 files changed

+292
-298
lines changed

28 files changed

+292
-298
lines changed

capa-spi-aws-config/pom.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,18 @@
3939
<groupId>software.amazon.awssdk</groupId>
4040
<artifactId>appconfig</artifactId>
4141
</dependency>
42+
43+
<dependency>
44+
<groupId>com.google.guava</groupId>
45+
<artifactId>guava</artifactId>
46+
</dependency>
47+
4248
<!-- unit test -->
4349
<dependency>
4450
<groupId>org.junit.jupiter</groupId>
4551
<artifactId>junit-jupiter-engine</artifactId>
4652
<scope>test</scope>
4753
</dependency>
48-
49-
<dependency>
50-
<groupId>com.google.guava</groupId>
51-
<artifactId>guava</artifactId>
52-
</dependency>
5354
</dependencies>
5455

5556
<build>

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfiguration.java renamed to capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfigStore.java

Lines changed: 65 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import group.rxcloud.capa.component.configstore.StoreConfig;
2222
import group.rxcloud.capa.component.configstore.SubscribeResp;
2323
import group.rxcloud.capa.infrastructure.serializer.CapaObjectSerializer;
24-
import group.rxcloud.capa.spi.aws.config.common.serializer.SerializerProcessor;
2524
import group.rxcloud.capa.spi.aws.config.entity.Configuration;
2625
import group.rxcloud.capa.spi.aws.config.scheduler.AwsCapaConfigurationScheduler;
26+
import group.rxcloud.capa.spi.aws.config.serializer.SerializerProcessor;
2727
import group.rxcloud.capa.spi.configstore.CapaConfigStoreSpi;
2828
import group.rxcloud.cloudruntimes.utils.TypeRef;
2929
import org.slf4j.Logger;
@@ -46,19 +46,13 @@
4646
import java.util.concurrent.ExecutionException;
4747
import java.util.concurrent.TimeUnit;
4848

49-
import static group.rxcloud.capa.spi.aws.config.common.constant.CapaAWSConstants.AWS_APP_CONFIG_NAME;
50-
import static group.rxcloud.capa.spi.aws.config.common.constant.CapaAWSConstants.DEFAULT_ENV;
5149

5250
/**
5351
* @author Reckless Xu
5452
*/
55-
public class AwsCapaConfiguration extends CapaConfigStoreSpi {
53+
public class AwsCapaConfigStore extends CapaConfigStoreSpi {
5654

57-
private static final Logger LOGGER = LoggerFactory.getLogger(AwsCapaConfiguration.class);
58-
59-
private AppConfigAsyncClient appConfigAsyncClient;
60-
61-
private SerializerProcessor serializerProcessor;
55+
private static final Logger LOGGER = LoggerFactory.getLogger(AwsCapaConfigStore.class);
6256

6357
/**
6458
* key of versionMap--applicationName,format:appid_ENV,e.g:"12345_FAT"
@@ -69,17 +63,31 @@ public class AwsCapaConfiguration extends CapaConfigStoreSpi {
6963
* <p>
7064
* ps:currentHashMap may not be necessary, as update in synchronized method
7165
*/
72-
private static Map<String, ConcurrentHashMap<String, Configuration<?>>> versionMap;
66+
private static final Map<String, ConcurrentHashMap<String, Configuration<?>>> versionMap;
67+
68+
static {
69+
versionMap = new ConcurrentHashMap<>();
70+
}
71+
72+
private final CapaObjectSerializer objectSerializer;
73+
74+
private SerializerProcessor serializerProcessor;
75+
76+
private AppConfigAsyncClient appConfigAsyncClient;
7377

7478
/**
7579
* Instantiates a new Capa configuration.
7680
*
7781
* @param objectSerializer Serializer for transient request/response objects.
7882
*/
79-
public AwsCapaConfiguration(CapaObjectSerializer objectSerializer) {
83+
public AwsCapaConfigStore(CapaObjectSerializer objectSerializer) {
8084
super(objectSerializer);
85+
this.objectSerializer = objectSerializer;
86+
}
87+
88+
@Override
89+
protected void doInit(StoreConfig storeConfig) {
8190
appConfigAsyncClient = AppConfigAsyncClient.create();
82-
versionMap = new ConcurrentHashMap<>();
8391
serializerProcessor = new SerializerProcessor(objectSerializer);
8492
}
8593

@@ -91,12 +99,7 @@ public String stopSubscribe() {
9199

92100
@Override
93101
public void close() {
94-
//no need
95-
}
96-
97-
@Override
98-
protected void doInit(StoreConfig storeConfig) {
99-
//no need
102+
// no need
100103
}
101104

102105
@Override
@@ -105,7 +108,7 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
105108
if (CollectionUtils.isNullOrEmpty(keys)) {
106109
return Mono.error(new IllegalArgumentException("keys is null or empty"));
107110
}
108-
//todo:need to get the specific env from system properties
111+
// todo:need to get the specific env from system properties
109112
String applicationName = appId + "_FAT";
110113
String configurationName = keys.get(0);
111114
String clientConfigurationVersion = getCurVersion(applicationName, configurationName);
@@ -115,17 +118,17 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
115118
.clientId(UUID.randomUUID().toString())
116119
.configuration(configurationName)
117120
.clientConfigurationVersion(clientConfigurationVersion)
118-
.environment(DEFAULT_ENV)
121+
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigEnv())
119122
.build();
120123

121124
return Mono.fromFuture(() -> appConfigAsyncClient.getConfiguration(request))
122125
.publishOn(AwsCapaConfigurationScheduler.INSTANCE.configPublisherScheduler)
123126
.map(resp -> {
124-
//if version doesn't change, get from versionMap
127+
// if version doesn't change, get from versionMap
125128
if (Objects.equals(clientConfigurationVersion, resp.configurationVersion())) {
126129
items.add((ConfigurationItem<T>) getCurConfigurationItem(applicationName, configurationName));
127130
} else {
128-
//if version changes,update versionMap and return
131+
// if version changes,update versionMap and return
129132
Configuration<T> tConfiguration = updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
130133
if (tConfiguration != null) {
131134
items.add(tConfiguration.getConfigurationItem());
@@ -137,7 +140,7 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
137140

138141
@Override
139142
protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, String label, List<String> keys, Map<String, String> metadata, TypeRef<T> type) {
140-
//todo:need to get the specific env from system properties
143+
// todo: need to get the specific env from system properties
141144
String applicationName = appId + "_FAT";
142145
String configurationName = keys.get(0);
143146

@@ -146,7 +149,7 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
146149
}
147150

148151
private synchronized <T> Mono<Boolean> initConfig(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
149-
//double check whether has been initialized
152+
// double check whether has been initialized
150153
if (isInitialized(applicationName, configurationName)) {
151154
return Mono.just(true);
152155
}
@@ -160,7 +163,7 @@ private synchronized <T> Mono<Boolean> initConfig(String applicationName, String
160163
.clientId(UUID.randomUUID().toString())
161164
.configuration(configurationName)
162165
.clientConfigurationVersion(version)
163-
.environment(DEFAULT_ENV)
166+
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigEnv())
164167
.build();
165168

166169
GetConfigurationResponse resp = null;
@@ -191,39 +194,40 @@ private synchronized <T> void createSubscribe(String applicationName, String con
191194
return;
192195
}
193196
Flux.create(fluxSink -> {
194-
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
195-
.schedulePeriodically(() -> {
196-
String version = getCurVersion(applicationName, configurationName);
197-
198-
GetConfigurationRequest request = GetConfigurationRequest.builder()
199-
.application(applicationName)
200-
.clientId(UUID.randomUUID().toString())
201-
.configuration(configurationName)
202-
.clientConfigurationVersion(version)
203-
.environment(DEFAULT_ENV)
204-
.build();
205-
206-
GetConfigurationResponse resp = null;
207-
try {
208-
resp = appConfigAsyncClient.getConfiguration(request).get();
209-
} catch (InterruptedException | ExecutionException e) {
210-
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
211-
}
212-
//update subscribed status if needs
213-
getConfiguration(applicationName, configurationName).getSubscribed().compareAndSet(false, true);
214-
215-
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
216-
fluxSink.next(resp);
217-
}
218-
//todo:make the polling frequency configurable
219-
}, 0, 1, TimeUnit.SECONDS);
220-
})
197+
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
198+
.schedulePeriodically(() -> {
199+
String version = getCurVersion(applicationName, configurationName);
200+
201+
GetConfigurationRequest request = GetConfigurationRequest.builder()
202+
.application(applicationName)
203+
.clientId(UUID.randomUUID().toString())
204+
.configuration(configurationName)
205+
.clientConfigurationVersion(version)
206+
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigEnv())
207+
.build();
208+
209+
GetConfigurationResponse resp = null;
210+
try {
211+
resp = appConfigAsyncClient.getConfiguration(request).get();
212+
} catch (InterruptedException | ExecutionException e) {
213+
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
214+
}
215+
// update subscribed status if needs
216+
getConfiguration(applicationName, configurationName).getSubscribed().compareAndSet(false, true);
217+
218+
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
219+
fluxSink.next(resp);
220+
}
221+
// todo: make the polling frequency configurable
222+
}, 0, 1, TimeUnit.SECONDS);
223+
})
221224
.publishOn(AwsCapaConfigurationScheduler.INSTANCE.configPublisherScheduler)
222225
.map(origin -> {
223226
GetConfigurationResponse resp = (GetConfigurationResponse) origin;
224227
Configuration configuration = updateConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
225228
return configuration == null ? Configuration.EMPTY : configuration;
226-
}).filter(resp -> resp != Configuration.EMPTY)
229+
})
230+
.filter(resp -> resp != Configuration.EMPTY)
227231
.subscribe(resp -> {
228232
resp.triggers(resp.getConfigurationItem());
229233
});
@@ -232,10 +236,10 @@ private synchronized <T> void createSubscribe(String applicationName, String con
232236
private <T> Flux<SubscribeResp<T>> doSub(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type, String appId) {
233237
Configuration<?> configuration = getConfiguration(applicationName, configurationName);
234238
return Flux.create(fluxSink -> {
235-
configuration.addListener(configurationItem -> {
236-
fluxSink.next(configurationItem);
237-
});
238-
})
239+
configuration.addListener(configurationItem -> {
240+
fluxSink.next(configurationItem);
241+
});
242+
})
239243
.map(resp -> (ConfigurationItem<T>) resp)
240244
.map(resp -> convert(resp, appId));
241245
}
@@ -244,7 +248,7 @@ private <T> SubscribeResp<T> convert(ConfigurationItem<T> conf, String appId) {
244248
SubscribeResp<T> subscribeResp = new SubscribeResp<>();
245249
subscribeResp.setItems(Lists.newArrayList(conf));
246250
subscribeResp.setAppId(appId);
247-
subscribeResp.setStoreName(AWS_APP_CONFIG_NAME);
251+
subscribeResp.setStoreName(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigName());
248252
return subscribeResp;
249253
}
250254

@@ -286,15 +290,15 @@ private ConfigurationItem<?> getCurConfigurationItem(String applicationName, Str
286290
private <T> Configuration<T> updateConfigurationItem(String applicationName, String configurationName, TypeRef<T> type, SdkBytes contentSdkBytes, String version) {
287291
ConcurrentHashMap<String, Configuration<?>> configMap = versionMap.get(applicationName);
288292

289-
//in fact,configMap.get(configurationName) is always not null, as it has been initialized in initialization process
293+
// in fact,configMap.get(configurationName) is always not null, as it has been initialized in initialization process
290294
Configuration<T> configuration = (Configuration<T>) configMap.get(configurationName);
291295

292296
synchronized (configuration.lock) {
293-
//check whether content has been updated by other thread
297+
// check whether content has been updated by other thread
294298
if (configMap.containsKey(configurationName) && Objects.equals(configMap.get(configurationName).getClientConfigurationVersion(), version)) {
295299
return null;
296300
}
297-
//do need to update
301+
// do need to update
298302
T content = serializerProcessor.deserialize(contentSdkBytes, type, configurationName);
299303
configuration.setClientConfigurationVersion(version);
300304

@@ -350,5 +354,4 @@ private Configuration<?> getConfiguration(String applicationName, String configu
350354
}
351355
return Configuration.EMPTY;
352356
}
353-
354357
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.config;
18+
19+
import group.rxcloud.capa.infrastructure.config.CapaProperties;
20+
21+
import java.util.Properties;
22+
23+
/**
24+
* @author Reckless Xu
25+
*/
26+
public interface AwsCapaConfigurationProperties {
27+
28+
interface AppConfigProperties {
29+
30+
abstract class Settings {
31+
32+
private static String awsAppConfigName = "AWS AppConfig";
33+
private static String awsAppConfigEnv = "ENV";
34+
35+
private static final String RPC_AWS_APP_CONFIG_NAME = "RPC_AWS_APP_CONFIG_NAME";
36+
private static final String RPC_AWS_APP_CONFIG_ENV = "RPC_AWS_APP_CONFIG_ENV";
37+
38+
static {
39+
Properties properties = CapaProperties.COMPONENT_PROPERTIES_SUPPLIER.apply("configuration-aws");
40+
41+
awsAppConfigName = properties.getProperty(RPC_AWS_APP_CONFIG_NAME, awsAppConfigName);
42+
43+
awsAppConfigEnv = properties.getProperty(RPC_AWS_APP_CONFIG_ENV, awsAppConfigEnv);
44+
}
45+
46+
public static String getAwsAppConfigName() {
47+
return awsAppConfigName;
48+
}
49+
50+
public static String getAwsAppConfigEnv() {
51+
return awsAppConfigEnv;
52+
}
53+
54+
private Settings() {
55+
56+
}
57+
}
58+
}
59+
}

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/common/constant/CapaAWSConstants.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/entity/Configuration.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* @author Reckless Xu
2828
*/
2929
public class Configuration<T> {
30+
3031
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
3132

3233
public static final Configuration<Void> EMPTY = new Configuration<>();
@@ -53,7 +54,9 @@ public synchronized void addListener(ConfigurationListener<T> listener) {
5354
public boolean triggers(ConfigurationItem<T> data) {
5455
boolean result = true;
5556
for (ConfigurationListener<T> listener : listeners) {
56-
if (!trigger(listener, data)) result = false;
57+
if (!trigger(listener, data)) {
58+
result = false;
59+
}
5760
}
5861
return result;
5962
}
@@ -68,7 +71,6 @@ private boolean trigger(ConfigurationListener<T> listener, ConfigurationItem<T>
6871
}
6972
}
7073

71-
7274
public String getClientConfigurationVersion() {
7375
return clientConfigurationVersion;
7476
}

0 commit comments

Comments
 (0)