Skip to content

Commit 0cedc39

Browse files
committed
Complete the log collector (95%)
1 parent 003d387 commit 0cedc39

File tree

22 files changed

+794
-242
lines changed

22 files changed

+794
-242
lines changed

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/pom.xml

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
<maven.compiler.target>8</maven.compiler.target>
1717
<httpclient.version>4.5.13</httpclient.version>
1818
<httpmine.version>4.5.4</httpmine.version>
19-
<jackson-databind.version>2.13.2.2</jackson-databind.version>
19+
<log4j.version>2.17.1</log4j.version>
20+
<slf4j.version>1.7.15</slf4j.version>
2021
</properties>
2122

2223
<dependencies>
@@ -36,11 +37,33 @@
3637
<artifactId>httpmime</artifactId>
3738
<version>${httpmine.version}</version>
3839
</dependency>
39-
<!-- jackson module -->
40+
<!--log4j2-->
4041
<dependency>
41-
<groupId>com.fasterxml.jackson.core</groupId>
42-
<artifactId>jackson-databind</artifactId>
43-
<version>${jackson-databind.version}</version>
42+
<groupId>org.slf4j</groupId>
43+
<artifactId>slf4j-api</artifactId>
44+
<version>${slf4j.version}</version>
45+
<scope>provided</scope>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>org.apache.logging.log4j</groupId>
50+
<artifactId>log4j-slf4j-impl</artifactId>
51+
<version>${log4j.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>org.apache.logging.log4j</groupId>
57+
<artifactId>log4j-api</artifactId>
58+
<version>${log4j.version}</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>org.apache.logging.log4j</groupId>
64+
<artifactId>log4j-core</artifactId>
65+
<version>${log4j.version}</version>
66+
<scope>provided</scope>
4467
</dependency>
4568
</dependencies>
4669
</project>

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisLogAppenderConfig.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,100 @@
11
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
22

3-
public class StreamisRpcLogAppender {
3+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.cache.LogCache;
4+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig;
5+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
6+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfigBuilder;
7+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender;
8+
import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent;
9+
import org.apache.logging.log4j.core.Filter;
10+
import org.apache.logging.log4j.core.Layout;
11+
import org.apache.logging.log4j.core.LogEvent;
12+
import org.apache.logging.log4j.core.appender.AbstractAppender;
13+
import org.apache.logging.log4j.core.config.Property;
14+
import org.apache.logging.log4j.core.config.plugins.Plugin;
15+
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
16+
import org.apache.logging.log4j.core.config.plugins.PluginElement;
17+
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
18+
import org.apache.logging.log4j.core.layout.PatternLayout;
19+
20+
import java.io.Serializable;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.ServiceLoader;
25+
import java.util.concurrent.TimeUnit;
26+
27+
/**
28+
* Streamis rpc log appender
29+
*/
30+
@Plugin(name = "StreamRpcLog", category = "Core", elementType = "appender", printObject = true)
31+
public class StreamisRpcLogAppender extends AbstractAppender {
32+
33+
private static final String DEFAULT_APPENDER_NAME = "StreamRpcLog";
34+
35+
/**
36+
* Appender config
37+
*/
38+
private StreamisLogAppenderConfig appenderConfig;
39+
40+
/**
41+
* Rpc log sender
42+
*/
43+
private StreamisRpcLogSender rpcLogSender;
44+
45+
/**
46+
* Cache
47+
*/
48+
private LogCache<StreamisLogEvent> logCache;
49+
protected StreamisRpcLogAppender(String name, Filter filter,
50+
Layout<? extends Serializable> layout,
51+
boolean ignoreExceptions, Property[] properties,
52+
StreamisLogAppenderConfig appenderConfig) {
53+
super(name, filter, layout, ignoreExceptions, properties);
54+
this.appenderConfig = appenderConfig;
55+
this.rpcLogSender = new StreamisRpcLogSender(this.appenderConfig.getApplicationName(),
56+
this.appenderConfig.getSenderConfig());
57+
this.logCache = this.rpcLogSender.getOrCreateLogCache();
58+
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.rpcLogSender.close()));
59+
}
60+
61+
@Override
62+
public void append(LogEvent event) {
63+
String content = new String(getLayout().toByteArray(event));
64+
// Transform to stream log event;
65+
StreamisLogEvent logEvent = new StreamisLogEvent(content, System.currentTimeMillis());
66+
try {
67+
this.logCache.cacheLog(logEvent);
68+
} catch (InterruptedException e) {
69+
LOGGER.error("StreamisRpcLogAppender: {} interrupted when cache the log into the RPC sender, message: {}", this.getName(), e.getMessage());
70+
}
71+
}
72+
73+
@PluginFactory
74+
public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") String name,
75+
@PluginAttribute("appName") String applicationName,
76+
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions,
77+
@PluginElement("Filter") final Filter filter,
78+
@PluginElement("Layout") Layout<? extends Serializable> layout,
79+
@PluginElement("RpcLogSender")RpcLogSenderConfig rpcLogSenderConfig){
80+
if (null == name || name.trim().equals("")){
81+
name = DEFAULT_APPENDER_NAME;
82+
}
83+
if (Objects.isNull(layout)){
84+
layout = PatternLayout.createDefaultLayout();
85+
}
86+
// Search the config builder
87+
List<StreamisLogAppenderConfigBuilder> configBuilders = new ArrayList<>();
88+
StreamisLogAppenderConfig logAppenderConfig = null;
89+
ServiceLoader.load(StreamisLogAppenderConfigBuilder.class,
90+
StreamisRpcLogAppender.class.getClassLoader()).iterator().forEachRemaining(configBuilders::add);
91+
if (!configBuilders.isEmpty()){
92+
logAppenderConfig = configBuilders.get(0).build(applicationName, filter, rpcLogSenderConfig);
93+
}
94+
if (Objects.isNull(logAppenderConfig)){
95+
logAppenderConfig = new StreamisLogAppenderConfig(applicationName, filter, rpcLogSenderConfig);
96+
}
97+
return new StreamisRpcLogAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
98+
}
499

5100
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.config;
2+
3+
import org.apache.logging.log4j.core.config.plugins.Plugin;
4+
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
5+
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
6+
7+
/**
8+
* Authentication config
9+
*/
10+
@Plugin(
11+
name = "AuthConfig",
12+
category = "Core",
13+
printObject = true
14+
)
15+
public class RpcAuthConfig {
16+
/**
17+
* Key of token-code
18+
*/
19+
private String tokenCodeKey = "Token-Code";
20+
21+
/**
22+
* Key of token-user
23+
*/
24+
private String tokenUserKey = "Token-User";
25+
26+
/**
27+
* Token user
28+
*/
29+
private String tokenUser = System.getProperty("user.name");
30+
31+
/**
32+
* Token code
33+
*/
34+
private String tokenCode = "STREAM-LOG";
35+
36+
public RpcAuthConfig(){
37+
38+
}
39+
40+
public RpcAuthConfig(String tokenCodeKey, String tokenCode, String tokenUserKey, String tokenUser){
41+
if (null != tokenCodeKey) {
42+
this.tokenCodeKey = tokenCodeKey;
43+
}
44+
if (null != tokenCode){
45+
this.tokenCode = tokenCode;
46+
}
47+
if (null != tokenUserKey){
48+
this.tokenUserKey = tokenUserKey;
49+
}
50+
if (null != tokenUser){
51+
this.tokenUser = tokenUser;
52+
}
53+
}
54+
55+
@PluginFactory
56+
public static RpcAuthConfig createRpcAuthConfig(@PluginAttribute("tokenCodeKey") String tokenCodeKey,
57+
@PluginAttribute("tokenCode") String tokenCode,
58+
@PluginAttribute("tokenUserKey") String tokenUserKey, @PluginAttribute("tokenUser") String tokenUser){
59+
return new RpcAuthConfig(tokenCodeKey, tokenCode, tokenUserKey, tokenUser);
60+
}
61+
public String getTokenCodeKey() {
62+
return tokenCodeKey;
63+
}
64+
65+
public void setTokenCodeKey(String tokenCodeKey) {
66+
this.tokenCodeKey = tokenCodeKey;
67+
}
68+
69+
public String getTokenUserKey() {
70+
return tokenUserKey;
71+
}
72+
73+
public void setTokenUserKey(String tokenUserKey) {
74+
this.tokenUserKey = tokenUserKey;
75+
}
76+
77+
public String getTokenUser() {
78+
return tokenUser;
79+
}
80+
81+
public void setTokenUser(String tokenUser) {
82+
this.tokenUser = tokenUser;
83+
}
84+
85+
public String getTokenCode() {
86+
return tokenCode;
87+
}
88+
89+
public void setTokenCode(String tokenCode) {
90+
this.tokenCode = tokenCode;
91+
}
92+
}

0 commit comments

Comments
 (0)