Skip to content

Commit 9d31ce6

Browse files
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into dev-0.2.3-log-collector
2 parents b871ea6 + 5f755e4 commit 9d31ce6

File tree

27 files changed

+989
-260
lines changed

27 files changed

+989
-260
lines changed

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/pom.xml

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
<maven.compiler.target>8</maven.compiler.target>
1717
<httpclient.version>4.5.13</httpclient.version>
1818
<httpmine.version>4.5.4</httpmine.version>
19-
<jackson-databind.version>2.13.2.2</jackson-databind.version>
19+
<log4j.version>2.17.1</log4j.version>
20+
<slf4j.version>1.7.15</slf4j.version>
2021
</properties>
2122

2223
<dependencies>
@@ -36,11 +37,33 @@
3637
<artifactId>httpmime</artifactId>
3738
<version>${httpmine.version}</version>
3839
</dependency>
39-
<!-- jackson module -->
40+
<!--log4j2-->
4041
<dependency>
41-
<groupId>com.fasterxml.jackson.core</groupId>
42-
<artifactId>jackson-databind</artifactId>
43-
<version>${jackson-databind.version}</version>
42+
<groupId>org.slf4j</groupId>
43+
<artifactId>slf4j-api</artifactId>
44+
<version>${slf4j.version}</version>
45+
<scope>provided</scope>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>org.apache.logging.log4j</groupId>
50+
<artifactId>log4j-slf4j-impl</artifactId>
51+
<version>${log4j.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>org.apache.logging.log4j</groupId>
57+
<artifactId>log4j-api</artifactId>
58+
<version>${log4j.version}</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>org.apache.logging.log4j</groupId>
64+
<artifactId>log4j-core</artifactId>
65+
<version>${log4j.version}</version>
66+
<scope>provided</scope>
4467
</dependency>
4568
</dependencies>
4669
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
2+
3+
/**
4+
* Exception listener
5+
*/
6+
public interface ExceptionListener {
7+
8+
/**
9+
* Listen the exception
10+
* @param subject the subject that throws the exception
11+
* @param t Throwable
12+
* @param message message
13+
*/
14+
void onException(Object subject, Throwable t, String message);
15+
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderConfig.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,101 @@
11
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
22

3-
public class StreamisRpcLogAppender {
3+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.cache.LogCache;
4+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig;
5+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
6+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfigBuilder;
7+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender;
8+
import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent;
9+
import org.apache.logging.log4j.core.Filter;
10+
import org.apache.logging.log4j.core.Layout;
11+
import org.apache.logging.log4j.core.LogEvent;
12+
import org.apache.logging.log4j.core.appender.AbstractAppender;
13+
import org.apache.logging.log4j.core.config.Property;
14+
import org.apache.logging.log4j.core.config.plugins.Plugin;
15+
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
16+
import org.apache.logging.log4j.core.config.plugins.PluginElement;
17+
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
18+
import org.apache.logging.log4j.core.layout.PatternLayout;
19+
20+
import java.io.Serializable;
21+
import java.nio.Buffer;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Objects;
25+
import java.util.ServiceLoader;
26+
import java.util.concurrent.TimeUnit;
27+
28+
/**
29+
* Streamis rpc log appender
30+
*/
31+
@Plugin(name = "StreamRpcLog", category = "Core", elementType = "appender", printObject = true)
32+
public class StreamisRpcLogAppender extends AbstractAppender {
33+
34+
private static final String DEFAULT_APPENDER_NAME = "StreamRpcLog";
35+
36+
/**
37+
* Appender config
38+
*/
39+
private StreamisLogAppenderConfig appenderConfig;
40+
41+
/**
42+
* Rpc log sender
43+
*/
44+
private StreamisRpcLogSender rpcLogSender;
45+
46+
/**
47+
* Cache
48+
*/
49+
private LogCache<StreamisLogEvent> logCache;
50+
protected StreamisRpcLogAppender(String name, Filter filter,
51+
Layout<? extends Serializable> layout,
52+
boolean ignoreExceptions, Property[] properties,
53+
StreamisLogAppenderConfig appenderConfig) {
54+
super(name, filter, layout, ignoreExceptions, properties);
55+
this.appenderConfig = appenderConfig;
56+
this.rpcLogSender = new StreamisRpcLogSender(this.appenderConfig.getApplicationName(),
57+
this.appenderConfig.getSenderConfig());
58+
this.logCache = this.rpcLogSender.getOrCreateLogCache();
59+
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.rpcLogSender.close()));
60+
}
61+
62+
@Override
63+
public void append(LogEvent event) {
64+
String content = new String(getLayout().toByteArray(event));
65+
// Transform to stream log event;
66+
StreamisLogEvent logEvent = new StreamisLogEvent(content, System.currentTimeMillis());
67+
try {
68+
this.logCache.cacheLog(logEvent);
69+
} catch (InterruptedException e) {
70+
LOGGER.error("StreamisRpcLogAppender: {} interrupted when cache the log into the RPC sender, message: {}", this.getName(), e.getMessage());
71+
}
72+
}
73+
74+
@PluginFactory
75+
public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") String name,
76+
@PluginAttribute("appName") String applicationName,
77+
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions,
78+
@PluginElement("Filter") final Filter filter,
79+
@PluginElement("Layout") Layout<? extends Serializable> layout,
80+
@PluginElement("RpcLogSender")RpcLogSenderConfig rpcLogSenderConfig){
81+
if (null == name || name.trim().equals("")){
82+
name = DEFAULT_APPENDER_NAME;
83+
}
84+
if (Objects.isNull(layout)){
85+
layout = PatternLayout.createDefaultLayout();
86+
}
87+
// Search the config builder
88+
List<StreamisLogAppenderConfigBuilder> configBuilders = new ArrayList<>();
89+
StreamisLogAppenderConfig logAppenderConfig = null;
90+
ServiceLoader.load(StreamisLogAppenderConfigBuilder.class,
91+
StreamisRpcLogAppender.class.getClassLoader()).iterator().forEachRemaining(configBuilders::add);
92+
if (!configBuilders.isEmpty()){
93+
logAppenderConfig = configBuilders.get(0).build(applicationName, filter, rpcLogSenderConfig);
94+
}
95+
if (Objects.isNull(logAppenderConfig)){
96+
logAppenderConfig = new StreamisLogAppenderConfig(applicationName, filter, rpcLogSenderConfig);
97+
}
98+
return new StreamisRpcLogAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
99+
}
4100

5101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.config;
2+
3+
import org.apache.logging.log4j.core.config.plugins.Plugin;
4+
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
5+
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
6+
7+
/**
8+
* Authentication config
9+
*/
10+
@Plugin(
11+
name = "AuthConfig",
12+
category = "Core",
13+
printObject = true
14+
)
15+
public class RpcAuthConfig {
16+
/**
17+
* Key of token-code
18+
*/
19+
private String tokenCodeKey = "Token-Code";
20+
21+
/**
22+
* Key of token-user
23+
*/
24+
private String tokenUserKey = "Token-User";
25+
26+
/**
27+
* Token user
28+
*/
29+
private String tokenUser = System.getProperty("user.name");
30+
31+
/**
32+
* Token code
33+
*/
34+
private String tokenCode = "STREAM-LOG";
35+
36+
public RpcAuthConfig(){
37+
38+
}
39+
40+
public RpcAuthConfig(String tokenCodeKey, String tokenCode, String tokenUserKey, String tokenUser){
41+
if (null != tokenCodeKey) {
42+
this.tokenCodeKey = tokenCodeKey;
43+
}
44+
if (null != tokenCode){
45+
this.tokenCode = tokenCode;
46+
}
47+
if (null != tokenUserKey){
48+
this.tokenUserKey = tokenUserKey;
49+
}
50+
if (null != tokenUser){
51+
this.tokenUser = tokenUser;
52+
}
53+
}
54+
55+
@PluginFactory
56+
public static RpcAuthConfig createRpcAuthConfig(@PluginAttribute("tokenCodeKey") String tokenCodeKey,
57+
@PluginAttribute("tokenCode") String tokenCode,
58+
@PluginAttribute("tokenUserKey") String tokenUserKey, @PluginAttribute("tokenUser") String tokenUser){
59+
return new RpcAuthConfig(tokenCodeKey, tokenCode, tokenUserKey, tokenUser);
60+
}
61+
public String getTokenCodeKey() {
62+
return tokenCodeKey;
63+
}
64+
65+
public void setTokenCodeKey(String tokenCodeKey) {
66+
this.tokenCodeKey = tokenCodeKey;
67+
}
68+
69+
public String getTokenUserKey() {
70+
return tokenUserKey;
71+
}
72+
73+
public void setTokenUserKey(String tokenUserKey) {
74+
this.tokenUserKey = tokenUserKey;
75+
}
76+
77+
public String getTokenUser() {
78+
return tokenUser;
79+
}
80+
81+
public void setTokenUser(String tokenUser) {
82+
this.tokenUser = tokenUser;
83+
}
84+
85+
public String getTokenCode() {
86+
return tokenCode;
87+
}
88+
89+
public void setTokenCode(String tokenCode) {
90+
this.tokenCode = tokenCode;
91+
}
92+
}

0 commit comments

Comments
 (0)