Skip to content

Commit c33438f

Browse files
authored
Merge pull request #74 from reactivegroup/feature/metrics
upgrade log throwable
2 parents 384cc0f + 236a374 commit c33438f

File tree

23 files changed

+469
-216
lines changed

23 files changed

+469
-216
lines changed

capa-spi-aws-config/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<artifactId>capa-aws-parent</artifactId>
2525
<groupId>group.rxcloud</groupId>
26-
<version>1.10.12.2.RELEASE</version>
26+
<version>1.11.13.2.RELEASE</version>
2727
</parent>
2828

2929
<artifactId>capa-spi-aws-config</artifactId>
@@ -39,6 +39,21 @@
3939
<artifactId>appconfig</artifactId>
4040
</dependency>
4141

42+
<dependency>
43+
<groupId>software.amazon.awssdk</groupId>
44+
<artifactId>sts</artifactId>
45+
<exclusions>
46+
<exclusion>
47+
<groupId>io.netty</groupId>
48+
<artifactId>netty-codec-http2</artifactId>
49+
</exclusion>
50+
<exclusion>
51+
<groupId>software.amazon.awssdk</groupId>
52+
<artifactId>netty-nio-client</artifactId>
53+
</exclusion>
54+
</exclusions>
55+
</dependency>
56+
4257
<dependency>
4358
<groupId>com.google.guava</groupId>
4459
<artifactId>guava</artifactId>

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfigStore.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -128,27 +128,20 @@ public void close() {
128128
*/
129129
@Override
130130
protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group, String label, List<String> keys, Map<String, String> metadata, TypeRef<T> type) {
131-
List<ConfigurationItem<T>> items = new ArrayList<>();
132131
if (CollectionUtils.isNullOrEmpty(keys)) {
133-
LOGGER.warn("[Capa.Config] keys is null or empty,appId:{}", appId);
132+
LOGGER.warn("[[type=Capa.Config]] keys is null or empty,appId:{}", appId);
134133
return Mono.error(new CapaException(CapaErrorContext.PARAMETER_ERROR, "keys is null or empty"));
135134
}
136135

137136
String applicationName = String.format(APPCONFIG_NAME_FORMAT, appId, CapaFoundation.getEnv(FoundationType.TRIP));
138137
String configurationName = keys.get(0);
139138

140-
GetConfigurationRequest request = GetConfigurationRequest.builder()
141-
.application(applicationName)
142-
.clientId(UUID.randomUUID().toString())
143-
.configuration(configurationName)
144-
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getConfigAwsAppConfigEnv())
145-
.build();
139+
//init config and create subscribe polling
140+
initAndSubscribe(applicationName, configurationName, group, label, metadata, type);
146141

147-
return Mono.fromCallable(() -> {
148-
GetConfigurationResponse response = appConfigAsyncClient.getConfiguration(request).get(REQUEST_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
149-
return convertToConfigurationList(response, configurationName, type);
150-
})
151-
.doOnError(e -> LOGGER.error("[Capa.Config.getConfiguration] error occurs when getconfiguration, request:{}", request, e));
142+
//get current config from local
143+
ConfigurationItem<T> configurationItem = (ConfigurationItem<T>) getConfiguration(applicationName, configurationName).getConfigurationItem();
144+
return Mono.just(Lists.newArrayList(configurationItem));
152145
}
153146

154147
@Override
@@ -157,7 +150,7 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
157150
String applicationName = String.format(APPCONFIG_NAME_FORMAT, appId, CapaFoundation.getEnv(FoundationType.TRIP));
158151
String configurationName = keys.get(0);
159152

160-
initSubscribe(applicationName, configurationName, group, label, metadata, type);
153+
initAndSubscribe(applicationName, configurationName, group, label, metadata, type);
161154
return doSub(applicationName, configurationName, group, label, metadata, type, appId);
162155
}
163156

@@ -175,7 +168,7 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
175168
private synchronized <T> Configuration<T> initConfig(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
176169
// double check whether has been initialized
177170
if (isInitialized(applicationName, configurationName)) {
178-
LOGGER.info("[Capa.Config.initConfig] config has been initialized before,applicationName:{},configurationName:{}", applicationName, configurationName);
171+
LOGGER.info("[[type=Capa.Config.initConfig]] config has been initialized before,applicationName:{},configurationName:{}", applicationName, configurationName);
179172
return Configuration.EMPTY;
180173
}
181174
String version = getCurVersion(applicationName, configurationName);
@@ -188,28 +181,28 @@ private synchronized <T> Configuration<T> initConfig(String applicationName, Str
188181
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getConfigAwsAppConfigEnv())
189182
.build();
190183

191-
LOGGER.debug("[Capa.Config.initConfig] call getconfiguration in init process,request:{}", request);
184+
LOGGER.debug("[[type=Capa.Config.initConfig]] call getconfiguration in init process,request:{}", request);
192185

193186
return Mono.fromCallable(() -> {
194187
GetConfigurationResponse response = appConfigAsyncClient.getConfiguration(request).get(REQUEST_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
195-
LOGGER.debug("[Capa.Config.initConfig] call getconfiguration in init process,response:{}", response);
188+
LOGGER.debug("[[type=Capa.Config.initConfig]] call getconfiguration in init process,response:{}", response);
196189
return response;
197190
})
198-
.doOnError(e -> LOGGER.error("[initConfig] error occurs when getconfiguration in init process, request:{}", request, e))
191+
.doOnError(e -> LOGGER.error("[[type=Capa.Config.initConfig]] error occurs when getconfiguration in init process, request:{}", request, e))
199192
.map(resp -> initConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion()))
200193
.block();
201194
}
202195

203-
private <T> void initSubscribe(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
196+
private <T> void initAndSubscribe(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
204197
if (!isInitialized(applicationName, configurationName)) {
205198
initConfig(applicationName, configurationName, group, label, metadata, type);
206199
}
207200
if (!isSubscribed(applicationName, configurationName)) {
208-
createSubscribe(applicationName, configurationName, type);
201+
createSubscribePolling(applicationName, configurationName, type);
209202
}
210203
}
211204

212-
private synchronized <T> void createSubscribe(String applicationName, String configurationName, TypeRef<T> type) {
205+
private synchronized <T> void createSubscribePolling(String applicationName, String configurationName, TypeRef<T> type) {
213206
if (isSubscribed(applicationName, configurationName)) {
214207
return;
215208
}
@@ -229,17 +222,17 @@ private synchronized <T> void createSubscribe(String applicationName, String con
229222
.clientConfigurationVersion(version)
230223
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getConfigAwsAppConfigEnv())
231224
.build();
232-
LOGGER.debug("[Capa.Config.subscribePolling] subscribe polling task start,request:{}", request);
225+
LOGGER.debug("[[type=Capa.Config.subscribePolling]] subscribe polling task start,request:{}", request);
233226

234227
GetConfigurationResponse resp = null;
235228
try {
236229
resp = appConfigAsyncClient.getConfiguration(request).get(REQUEST_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
237230
} catch (InterruptedException | ExecutionException | TimeoutException e) {
238231
//catch error,log error and not trigger listeners
239-
LOGGER.error("[Capa.Config.subscribePolling] error occurs when getConfiguration in polling process,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
232+
LOGGER.error("[[type=Capa.Config.subscribePolling]] error occurs when getConfiguration in polling process,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
240233
}
241234

242-
LOGGER.debug("[Capa.Config.subscribePolling] subscribe polling task end,response:{}", resp);
235+
LOGGER.debug("[[type=Capa.Config.subscribePolling]] subscribe polling task end,response:{}", resp);
243236

244237
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
245238
fluxSink.next(resp);
@@ -255,7 +248,7 @@ private synchronized <T> void createSubscribe(String applicationName, String con
255248
})
256249
.filter(resp -> resp != Configuration.EMPTY)
257250
.subscribe(resp -> {
258-
LOGGER.info("[Capa.Config.triggerListener] receive changes and trigger listeners,response:{}", resp);
251+
LOGGER.info("[[type=Capa.Config.triggerListener]] receive changes and trigger listeners,response:{}", resp);
259252
resp.triggers(resp.getConfigurationItem());
260253
});
261254
}
@@ -268,7 +261,7 @@ private <T> Flux<SubscribeResp<T>> doSub(String applicationName, String configur
268261
return Flux
269262
.create(fluxSink -> {
270263
configuration.addListener(configurationItem -> {
271-
LOGGER.info("[Capa.Config.listenerOnChange] listener onChanges, configurationItem:{}", configurationItem);
264+
LOGGER.info("[[type=Capa.Config.listenerOnChange]] listener onChanges, configurationItem key:{}", configurationItem.getKey());
272265
fluxSink.next(configurationItem);
273266
});
274267
})
@@ -330,7 +323,7 @@ private <T> Configuration<T> updateConfigurationItem(String applicationName, Str
330323
configuration.setConfigurationItem(configurationItem);
331324

332325
configMap.put(configurationName, configuration);
333-
LOGGER.info("[Capa.Config.updateConfig] update config,key configurationName:{},value configuration:{}", configurationName, configuration);
326+
LOGGER.info("[[type=Capa.Config.updateConfig]] update config,key configurationName:{},value configuration:{}", configurationName, configuration);
334327
return configuration;
335328
}
336329
}
@@ -355,11 +348,11 @@ private <T> Configuration<T> initConfigurationItem(String applicationName, Strin
355348

356349
configMap.put(configurationName, configuration);
357350

358-
LOGGER.info("[Capa.Config.initConfig] process initConfigurationItem,put key configurationName:{},value configuration:{}", configurationName, configuration);
351+
LOGGER.info("[[type=Capa.Config.initConfig]] process initConfigurationItem,put key configurationName:{},value configuration:{}", configurationName, configuration);
359352

360353
if (initApplication) {
361354
versionMap.put(applicationName, configMap);
362-
LOGGER.info("[Capa.Config.initConfig] process initConfigurationItem,put key applicationName:{}", applicationName);
355+
LOGGER.info("[[type=Capa.Config.initConfig]] process initConfigurationItem,put key applicationName:{}", applicationName);
363356
}
364357
return configuration;
365358
}

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/entity/Configuration.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ public class Configuration<T> {
3636
public String toString() {
3737
return "Configuration{" +
3838
"clientConfigurationVersion='" + clientConfigurationVersion + '\'' +
39-
", configurationItem=" + configurationItem +
40-
", lock=" + lock +
41-
", listeners=" + listeners +
39+
", configurationItem=" + configurationItem.getContent().toString() +
40+
", listenersCount=" + listeners.size() +
4241
", initialized=" + initialized +
4342
", subscribed=" + subscribed +
4443
'}';

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/serializer/PropertiesSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.io.InputStream;
26-
import java.util.LinkedHashMap;
26+
import java.util.HashMap;
2727
import java.util.Map;
2828
import java.util.Properties;
2929

@@ -47,7 +47,7 @@ private Map<String, String> parsePropertiesToMap(InputStream inputStream) {
4747
} catch (IOException e) {
4848
LOGGER.error("properties load error", e);
4949
}
50-
Map<String, String> map = new LinkedHashMap<>(properties.size());
50+
Map<String, String> map = new HashMap<>(properties.size());
5151
for (String key : properties.stringPropertyNames()) {
5252
map.put(key, properties.getProperty(key));
5353
}

capa-spi-aws-config/src/test/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfigStoreIgnoreTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ void testDoGet(){
5252
Mono<List<ConfigurationItem<User>>> mono2 = ins.doGet("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
5353
System.out.println(mono2.block().get(0).getContent().getAge());
5454

55+
Flux<SubscribeResp<User>> flux1 = ins.doSubscribe("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
56+
flux1.subscribe(resp -> {
57+
System.out.println("1:"+resp.getItems().get(0).getContent().getAge());
58+
});
59+
60+
long start = System.currentTimeMillis();
61+
while(System.currentTimeMillis()-start<20000){
62+
63+
}
64+
//change config
65+
Mono<List<ConfigurationItem<User>>> mono3 = ins.doGet("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
66+
System.out.println(mono3.block().get(0).getContent().getAge());
67+
68+
System.out.println("");
69+
5570
// Mono<List<ConfigurationItem<User>>> mono = ins.doGet("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
5671
// System.out.println(mono.block().get(0).getContent().getAge());
5772
//

capa-spi-aws-infrastructure/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<artifactId>capa-aws-parent</artifactId>
2525
<groupId>group.rxcloud</groupId>
26-
<version>1.10.12.2.RELEASE</version>
26+
<version>1.11.13.2.RELEASE</version>
2727
</parent>
2828

2929
<artifactId>capa-spi-aws-infrastructure</artifactId>

capa-spi-aws-log/pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<artifactId>capa-aws-parent</artifactId>
2525
<groupId>group.rxcloud</groupId>
26-
<version>1.10.12.2.RELEASE</version>
26+
<version>1.11.13.2.RELEASE</version>
2727
</parent>
2828

2929
<artifactId>capa-spi-aws-log</artifactId>
@@ -56,6 +56,21 @@
5656
<artifactId>cloudwatch</artifactId>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>software.amazon.awssdk</groupId>
61+
<artifactId>sts</artifactId>
62+
<exclusions>
63+
<exclusion>
64+
<groupId>io.netty</groupId>
65+
<artifactId>netty-codec-http2</artifactId>
66+
</exclusion>
67+
<exclusion>
68+
<groupId>software.amazon.awssdk</groupId>
69+
<artifactId>netty-nio-client</artifactId>
70+
</exclusion>
71+
</exclusions>
72+
</dependency>
73+
5974
<!-- fixme: can we only use jackson -->
6075
<dependency>
6176
<groupId>com.google.code.gson</groupId>

capa-spi-aws-log/src/main/java/group/rxcloud/capa/spi/aws/log/appender/CapaAwsLog4jAppender.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package group.rxcloud.capa.spi.aws.log.appender;
1818

1919
import group.rxcloud.capa.infrastructure.hook.Mixer;
20-
import group.rxcloud.capa.infrastructure.hook.TelemetryHooks;
2120
import group.rxcloud.capa.spi.aws.log.enums.CapaLogLevel;
21+
import group.rxcloud.capa.spi.aws.log.manager.CustomLogManager;
2222
import group.rxcloud.capa.spi.aws.log.manager.LogAppendManager;
2323
import group.rxcloud.capa.spi.aws.log.manager.LogManager;
2424
import group.rxcloud.capa.spi.log.CapaLog4jAppenderSpi;
@@ -33,43 +33,57 @@
3333
import java.util.HashMap;
3434
import java.util.Map;
3535
import java.util.Optional;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
public class CapaAwsLog4jAppender extends CapaLog4jAppenderSpi {
39+
3840
/**
3941
* The error type name of the log4j appender.
4042
*/
4143
protected static final String LOG_LOG4J_APPENDER_ERROR_TYPE = "Log4jAppendLogsError";
44+
4245
/**
4346
* Number of counts each time.
4447
*/
4548
protected static final Integer COUNTER_NUM = 1;
46-
/**
47-
* The instance of the {@link TelemetryHooks}.
48-
*/
49-
private static final Optional<TelemetryHooks> TELEMETRY_HOOKS;
49+
5050
/**
5151
* The namespace for logging error.
5252
* TODO Set variables to common variables
5353
*/
5454
private static final String LOG_ERROR_NAMESPACE = "CloudWatchLogs";
55+
5556
/**
5657
* The metric name for logging error.
5758
* TODO Set variables to common variables
5859
*/
5960
private static final String LOG_ERROR_METRIC_NAME = "LogError";
61+
62+
private static final AtomicBoolean METRIC_INIT = new AtomicBoolean(false);
63+
6064
/**
6165
* Init an instance of {@link LongCounter}.
6266
*/
6367
protected static Optional<LongCounter> LONG_COUNTER = Optional.empty();
6468

6569
static {
6670
PluginManager.addPackage("group.rxcloud.capa.spi.aws.log.appender");
67-
TELEMETRY_HOOKS = Mixer.telemetryHooksNullable();
68-
TELEMETRY_HOOKS.ifPresent(telemetryHooks -> {
69-
Meter meter = telemetryHooks.buildMeter(LOG_ERROR_NAMESPACE).block();
70-
LongCounter longCounter = meter.counterBuilder(LOG_ERROR_METRIC_NAME).build();
71-
LONG_COUNTER = Optional.ofNullable(longCounter);
72-
});
71+
}
72+
73+
static Optional<LongCounter> getCounterOpt() {
74+
if (METRIC_INIT.get()) {
75+
return LONG_COUNTER;
76+
}
77+
synchronized (METRIC_INIT) {
78+
if (METRIC_INIT.compareAndSet(false, true)) {
79+
Mixer.telemetryHooksNullable().ifPresent(telemetryHooks -> {
80+
Meter meter = telemetryHooks.buildMeter(LOG_ERROR_NAMESPACE).block();
81+
LongCounter longCounter = meter.counterBuilder(LOG_ERROR_METRIC_NAME).build();
82+
LONG_COUNTER = Optional.ofNullable(longCounter);
83+
});
84+
}
85+
}
86+
return LONG_COUNTER;
7387
}
7488

7589
@Override
@@ -81,20 +95,23 @@ public void appendLog(LogEvent event) {
8195
return;
8296
}
8397
Optional<CapaLogLevel> capaLogLevel = CapaLogLevel.toCapaLogLevel(event.getLevel().name());
84-
if(capaLogLevel.isPresent() && LogManager.logsCanOutput(capaLogLevel.get())){
98+
if (capaLogLevel.isPresent() && LogManager.logsCanOutput(capaLogLevel.get())) {
8599
String message = event.getMessage().getFormattedMessage();
86100
ReadOnlyStringMap contextData = event.getContextData();
87101
Map<String, String> MDCTags = contextData == null ? new HashMap<>() : contextData.toMap();
88-
LogAppendManager.appendLogs(message, MDCTags, event.getLevel().name());
102+
LogAppendManager.appendLogs(message, MDCTags, event.getLoggerName(), event.getThreadName(),
103+
event.getLevel().name(), event.getTimeMillis(), event.getThrown());
89104
}
90105
} catch (Exception e) {
91106
try {
107+
CustomLogManager.error("CapaAwsLog4jAppender appender log error.", e);
92108
//Enhance function without affecting function
93-
LONG_COUNTER.ifPresent(longCounter -> {
94-
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOG4J_APPENDER_ERROR_TYPE), LOG_LOG4J_APPENDER_ERROR_TYPE))
95-
.add(COUNTER_NUM);
109+
getCounterOpt().ifPresent(longCounter -> {
110+
longCounter.bind(Attributes
111+
.of(AttributeKey.stringKey(LOG_LOG4J_APPENDER_ERROR_TYPE), e.getClass().getName()))
112+
.add(COUNTER_NUM);
96113
});
97-
} finally {
114+
} catch (Throwable ex) {
98115
}
99116

100117
}

0 commit comments

Comments
 (0)