Skip to content

Commit 3fb6fd7

Browse files
authored
Merge pull request #55 from reactivegroup/feature/metrics
upgrade cloud watch
2 parents 4517e2e + a2a48c4 commit 3fb6fd7

File tree

16 files changed

+696
-53
lines changed

16 files changed

+696
-53
lines changed

capa-spi-aws-config/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@
5050
<artifactId>guava</artifactId>
5151
</dependency>
5252

53+
<!--http-->
54+
<dependency>
55+
<groupId>org.apache.httpcomponents</groupId>
56+
<artifactId>httpclient</artifactId>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.httpcomponents</groupId>
60+
<artifactId>httpcore</artifactId>
61+
</dependency>
62+
5363
<!-- unit test -->
5464
<dependency>
5565
<groupId>org.junit.jupiter</groupId>

capa-spi-aws-log/pom.xml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@
3333
<log4j.version>2.14.1</log4j.version>
3434
<logback.version>1.2.3</logback.version>
3535
<gson.version>2.8.0</gson.version>
36+
<sentinel.core.version>1.8.2</sentinel.core.version>
3637
</properties>
3738

3839
<dependencies>
3940
<dependency>
4041
<groupId>group.rxcloud</groupId>
4142
<artifactId>capa-spi-aws-infrastructure</artifactId>
4243
</dependency>
43-
4444
<dependency>
4545
<groupId>group.rxcloud</groupId>
4646
<artifactId>capa-foundation</artifactId>
@@ -88,7 +88,26 @@
8888
<version>${log4j.version}</version>
8989
<optional>true</optional>
9090
</dependency>
91-
91+
<!--http-->
92+
<dependency>
93+
<groupId>org.apache.httpcomponents</groupId>
94+
<artifactId>httpclient</artifactId>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.apache.httpcomponents</groupId>
98+
<artifactId>httpcore</artifactId>
99+
</dependency>
100+
<dependency>
101+
<groupId>com.alibaba.csp</groupId>
102+
<artifactId>sentinel-core</artifactId>
103+
<version>${sentinel.core.version}</version>
104+
</dependency>
105+
<dependency>
106+
<groupId>org.projectlombok</groupId>
107+
<artifactId>lombok</artifactId>
108+
<scope>test</scope>
109+
<version>1.18.22</version>
110+
</dependency>
92111
<!-- unit test -->
93112
<dependency>
94113
<groupId>org.junit.jupiter</groupId>

capa-spi-aws-log/src/main/java/group/rxcloud/capa/spi/aws/log/appender/CapaAwsLog4jAppender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void appendLog(LogEvent event) {
8686
try {
8787
//Enhance function without affecting function
8888
LONG_COUNTER.ifPresent(longCounter -> {
89-
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOG4J_APPENDER_ERROR_TYPE), e.getMessage()))
89+
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOG4J_APPENDER_ERROR_TYPE), LOG_LOG4J_APPENDER_ERROR_TYPE))
9090
.add(COUNTER_NUM);
9191
});
9292
} finally {

capa-spi-aws-log/src/main/java/group/rxcloud/capa/spi/aws/log/appender/CapaAwsLogbackAppender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void appendLog(ILoggingEvent event) {
8181
LONG_COUNTER.ifPresent(longCounter -> {
8282
try {
8383
//Enhance function without affecting function
84-
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOGBACK_APPENDER_ERROR_TYPE), e.getMessage()))
84+
longCounter.bind(Attributes.of(AttributeKey.stringKey(LOG_LOGBACK_APPENDER_ERROR_TYPE), LOG_LOGBACK_APPENDER_ERROR_TYPE))
8585
.add(COUNTER_NUM);
8686
} finally {
8787
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.log.handle;
18+
19+
import java.util.LinkedList;
20+
import java.util.Queue;
21+
22+
public class ChunkQueue {
23+
private final int maxBytes;
24+
private final Queue<CompressedChunk> queue = new LinkedList<>();
25+
private int bytes;
26+
private int chunkCount = 0;
27+
28+
public ChunkQueue(int maxBytes) {
29+
this.maxBytes = maxBytes;
30+
}
31+
32+
public int drainTo(LinkedList<CompressedChunk> collection, int maxElements, int maxSize) {
33+
int elementCount = 0;
34+
int totalSize = 0;
35+
while (elementCount < maxElements) {
36+
CompressedChunk element = queue.peek();
37+
if (element == null || element.getSize() + totalSize > maxSize) {
38+
break;
39+
} else {
40+
queue.poll();
41+
collection.add(element);
42+
elementCount++;
43+
totalSize += element.getSize();
44+
}
45+
}
46+
return totalSize;
47+
}
48+
49+
public synchronized boolean offer(CompressedChunk chunk) {
50+
int length = chunk.getSize();
51+
if (bytes >= maxBytes) {
52+
try {
53+
Thread.sleep(100);
54+
} catch (Exception e) {
55+
56+
}
57+
}
58+
bytes += length;
59+
chunkCount++;
60+
return queue.offer(chunk);
61+
}
62+
63+
public synchronized boolean isEmpty() {
64+
return queue.isEmpty();
65+
}
66+
}
67+
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.log.handle;
18+
19+
public class CompressedChunk {
20+
21+
private String message;
22+
private int size;
23+
24+
public String getMessage() {
25+
return message;
26+
}
27+
28+
public void setMessage(String message) {
29+
this.message = message;
30+
}
31+
32+
public int getSize() {
33+
return size;
34+
}
35+
36+
public void setSize(int size) {
37+
this.size = size;
38+
}
39+
}
40+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.log.handle;
18+
19+
public class MessageConsumer {
20+
21+
private ChunkQueue chunkQueue;
22+
23+
public MessageConsumer(ChunkQueue chunkQueue) {
24+
this.chunkQueue = chunkQueue;
25+
}
26+
27+
public synchronized void processLogEvent(String message) {
28+
chunkQueue.offer(compressedChunk(message));
29+
}
30+
31+
private CompressedChunk compressedChunk(String message) {
32+
CompressedChunk compressedChunk = new CompressedChunk();
33+
compressedChunk.setMessage(message);
34+
compressedChunk.setSize(message.length());
35+
return compressedChunk;
36+
}
37+
}
38+
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package group.rxcloud.capa.spi.aws.log.handle;
18+
19+
import group.rxcloud.capa.infrastructure.hook.Mixer;
20+
import io.opentelemetry.api.common.AttributeKey;
21+
import io.opentelemetry.api.common.Attributes;
22+
import io.opentelemetry.api.metrics.LongCounter;
23+
import io.opentelemetry.api.metrics.Meter;
24+
import org.jetbrains.annotations.NotNull;
25+
26+
import java.util.Optional;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
public class MessageManager {
30+
31+
private static final Object lock = new Object();
32+
private static final float DEFAULT_CHUNK_QUEUE_MEM_PERCENT = 0.05f;
33+
private static final int DEFAULT_CHUNK_QUEUE_MEM_BYTES = 100 * 1024 * 1024;
34+
private static final String MESSAGE_MANAGER_ERROR_NAMESPACE = "LogMessageManagerError";
35+
private static final String MESSAGE_MANAGER_ERROR_METRIC_NAME = "LogsManagerError";
36+
private static volatile MessageManager messageManager;
37+
private static Optional<LongCounter> LONG_COUNTER = Optional.empty();
38+
39+
static {
40+
Mixer.telemetryHooksNullable()
41+
.ifPresent(telemetryHooks -> {
42+
Meter meter = telemetryHooks.buildMeter(MESSAGE_MANAGER_ERROR_NAMESPACE).block();
43+
LongCounter longCounter = meter.counterBuilder(MESSAGE_MANAGER_ERROR_METRIC_NAME).build();
44+
LONG_COUNTER = Optional.ofNullable(longCounter);
45+
});
46+
}
47+
48+
private final int chunkQueueMaxBytes;
49+
//private MonitorSender monitorSender;
50+
private final AtomicInteger senderNumber = new AtomicInteger(1);
51+
private final MessageConsumer consumer;
52+
private final MessageSender sender;
53+
private final ChunkQueue chunkQueue;
54+
private volatile boolean shutdownInProgress = false;
55+
56+
private MessageManager() {
57+
Runtime runtime = Runtime.getRuntime();
58+
runtime.addShutdownHook(new ClientFinalizer("AWSManager-ClientFinalizer"));
59+
60+
long maxMemory = runtime.maxMemory();
61+
int defaultChunkQueueMaxBytes = (int) (maxMemory * DEFAULT_CHUNK_QUEUE_MEM_PERCENT);
62+
if (defaultChunkQueueMaxBytes > DEFAULT_CHUNK_QUEUE_MEM_BYTES) {
63+
defaultChunkQueueMaxBytes = DEFAULT_CHUNK_QUEUE_MEM_BYTES;
64+
}
65+
chunkQueueMaxBytes = defaultChunkQueueMaxBytes;
66+
chunkQueue = new ChunkQueue(chunkQueueMaxBytes);
67+
sender = new MessageSender(chunkQueue);
68+
startNewSender();
69+
consumer = createConsumer();
70+
}
71+
72+
public static MessageManager getInstance() {
73+
if (messageManager == null) {
74+
synchronized (lock) {
75+
if (messageManager == null) {
76+
try {
77+
messageManager = new MessageManager();
78+
} catch (Throwable t) {
79+
LONG_COUNTER.ifPresent(longCounter -> {
80+
longCounter.bind(Attributes.of(AttributeKey.stringKey("ManagerGetInstanceError"), "ManagerGetInstanceError"))
81+
.add(1);
82+
});
83+
}
84+
}
85+
}
86+
}
87+
return messageManager;
88+
}
89+
90+
public MessageConsumer getConsumer() {
91+
return consumer;
92+
}
93+
94+
protected void startNewSender() {
95+
Thread t = new Thread(sender);
96+
t.setName("AWSManager-MessageSender" + "-" + senderNumber.getAndIncrement());
97+
t.setDaemon(true);
98+
t.start();
99+
}
100+
101+
public MessageConsumer createConsumer() {
102+
MessageConsumer consumer = new MessageConsumer(chunkQueue);
103+
return consumer;
104+
}
105+
106+
public void shutdown() {
107+
synchronized (lock) {
108+
if (this.shutdownInProgress) {
109+
return;
110+
}
111+
this.shutdownInProgress = true;
112+
}
113+
sender.shutdown();
114+
}
115+
116+
class ClientFinalizer extends Thread {
117+
public ClientFinalizer(@NotNull String name) {
118+
super(name);
119+
}
120+
121+
@Override
122+
public void run() {
123+
try {
124+
MessageManager.getInstance().shutdown();
125+
} catch (Exception e) {
126+
LONG_COUNTER.ifPresent(longCounter -> {
127+
longCounter.bind(Attributes.of(AttributeKey.stringKey("ClientFinalizerError"), "ClientFinalizerError"))
128+
.add(1);
129+
});
130+
}
131+
}
132+
}
133+
}
134+

0 commit comments

Comments
 (0)