Skip to content

Commit 5eb36e6

Browse files
committed
Add config autowired interface to set the log appender params.
1 parent 5f755e4 commit 5eb36e6

File tree

6 files changed

+53
-52
lines changed

6 files changed

+53
-52
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
2+
3+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
4+
5+
/**
6+
* Streamis config autowired
7+
*/
8+
public interface StreamisConfigAutowired {
9+
10+
/**
11+
* Log appender config
12+
* @param builder builder
13+
*/
14+
StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder);
15+
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.webank.wedatasphere.streamis.jobmanager.log.collector.cache.LogCache;
44
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig;
55
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
6-
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfigBuilder;
76
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender;
87
import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent;
98
import org.apache.logging.log4j.core.Filter;
@@ -18,12 +17,10 @@
1817
import org.apache.logging.log4j.core.layout.PatternLayout;
1918

2019
import java.io.Serializable;
21-
import java.nio.Buffer;
2220
import java.util.ArrayList;
2321
import java.util.List;
2422
import java.util.Objects;
2523
import java.util.ServiceLoader;
26-
import java.util.concurrent.TimeUnit;
2724

2825
/**
2926
* Streamis rpc log appender
@@ -84,16 +81,21 @@ public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") Str
8481
if (Objects.isNull(layout)){
8582
layout = PatternLayout.createDefaultLayout();
8683
}
87-
// Search the config builder
88-
List<StreamisLogAppenderConfigBuilder> configBuilders = new ArrayList<>();
84+
// Search the config autowired class
85+
List<StreamisConfigAutowired> configAutowiredEntities = new ArrayList<>();
8986
StreamisLogAppenderConfig logAppenderConfig = null;
90-
ServiceLoader.load(StreamisLogAppenderConfigBuilder.class,
91-
StreamisRpcLogAppender.class.getClassLoader()).iterator().forEachRemaining(configBuilders::add);
92-
if (!configBuilders.isEmpty()){
93-
logAppenderConfig = configBuilders.get(0).build(applicationName, filter, rpcLogSenderConfig);
87+
ServiceLoader.load(StreamisConfigAutowired.class,
88+
StreamisRpcLogAppender.class.getClassLoader()).iterator().forEachRemaining(configAutowiredEntities::add);
89+
StreamisLogAppenderConfig.Builder builder = new StreamisLogAppenderConfig.Builder(applicationName, filter, rpcLogSenderConfig);
90+
for (StreamisConfigAutowired autowired : configAutowiredEntities){
91+
logAppenderConfig = autowired.logAppenderConfig(builder);
9492
}
9593
if (Objects.isNull(logAppenderConfig)){
96-
logAppenderConfig = new StreamisLogAppenderConfig(applicationName, filter, rpcLogSenderConfig);
94+
logAppenderConfig = builder.build();
95+
}
96+
applicationName = logAppenderConfig.getApplicationName();
97+
if (null == applicationName || applicationName.trim().equals("")){
98+
throw new IllegalArgumentException("Application name cannot be empty");
9799
}
98100
return new StreamisRpcLogAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
99101
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,38 @@ public StreamisLogAppenderConfig(String applicationName, Filter filter,
2323
this.senderConfig = null != rpcLogSenderConfig? rpcLogSenderConfig : new RpcLogSenderConfig();
2424
}
2525

26+
public static class Builder{
27+
private String applicationName;
28+
29+
private Filter filter;
30+
31+
private RpcLogSenderConfig rpcLogSenderConfig;
32+
33+
public Builder(String applicationName, Filter filter,
34+
RpcLogSenderConfig rpcLogSenderConfig){
35+
this.applicationName = applicationName;
36+
this.filter = filter;
37+
this.rpcLogSenderConfig = rpcLogSenderConfig;
38+
}
39+
40+
StreamisLogAppenderConfig.Builder setAppName(){
41+
return null;
42+
}
43+
44+
public StreamisLogAppenderConfig build(){
45+
return null;
46+
}
47+
}
2648
public String getApplicationName() {
2749
return applicationName;
2850
}
2951

30-
public void setApplicationName(String applicationName) {
31-
this.applicationName = applicationName;
32-
}
33-
3452
public Filter getFilter() {
3553
return filter;
3654
}
3755

38-
public void setFilter(Filter filter) {
39-
this.filter = filter;
40-
}
41-
4256
public RpcLogSenderConfig getSenderConfig() {
4357
return senderConfig;
4458
}
4559

46-
public void setSenderConfig(RpcLogSenderConfig senderConfig) {
47-
this.senderConfig = senderConfig;
48-
}
4960
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfigBuilder.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/test/resources/log4j2.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
<appenders>
2121
<StreamRpcLog name="StreamRpcLog" appName="stream_application">
2222
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"/>
23-
<RpcLogSender address="http://127.0.0.1/log/collector" sendRetryCnt="3"
24-
connectionTimeout="3000" socketTimeout="15000" serverRecoveryTimeInSec="5" maxDelayTimeInSec="60">
23+
<RpcLogSender sendRetryCnt="3" connectionTimeout="3000"
24+
socketTimeout="15000" serverRecoveryTimeInSec="5" maxDelayTimeInSec="60">
2525
<AuthConfig tokenCodeKey="" tokenCode="" tokenUser="" tokenUserKey=""/>
2626
<SendLogCache size="200" maxConsumeThread="10"/>
2727
<SendBuffer size="50" expireTimeInSec="2"/>

streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/JobLogStorage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ public interface JobLogStorage {
1010

1111
/**
1212
* Create buckets
13-
* @param jobName job name
13+
* @param appName application name
1414
* @param bucketConfig bucket config
1515
* @return config
1616
*/
17-
JobLogBucket getOrCreateBucket(String jobName, JobLogBucketConfig bucketConfig);
17+
JobLogBucket getOrCreateBucket(String appName, JobLogBucketConfig bucketConfig);
1818
}

0 commit comments

Comments
 (0)