Skip to content

Commit 9740a62

Browse files
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into dev-0.2.3-log-collector
2 parents 3d0359a + 25f772d commit 9740a62

File tree

39 files changed

+1384
-85
lines changed

39 files changed

+1384
-85
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
<properties>
4848
<linkis.version>1.1.3</linkis.version>
49+
<junit.version>4.12</junit.version>
4950
<dss.version>1.1.0</dss.version>
5051
<streamis.version>0.2.0</streamis.version>
5152
<scala.version>2.11.12</scala.version>

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/conf/JobConfKeyConstants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import org.apache.linkis.common.conf.CommonVars
2323
*/
2424
object JobConfKeyConstants {
2525

26+
/**
27+
* Config group for streamis internal configuration
28+
*/
29+
val GROUP_INTERNAL: CommonVars[String] = CommonVars("wds.streamis.job.internal.config.group", "wds.streamis.internal.params")
2630
/**
2731
* Group: Flink extra
2832
*/

streamis-jobmanager/streamis-job-log/flink-streamis-log-collector/pom.xml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,80 @@
1616
<maven.compiler.target>8</maven.compiler.target>
1717
<!-- The flink version dependent-->
1818
<flink.version>1.12.2</flink.version>
19+
<log4j.version>2.17.1</log4j.version>
20+
<slf4j.version>1.7.15</slf4j.version>
1921
</properties>
2022
<dependencies>
23+
<dependency>
24+
<groupId>com.webank.wedatasphere.streamis</groupId>
25+
<artifactId>streamis-job-log-collector</artifactId>
26+
<version>${streamis.version}</version>
27+
</dependency>
2128
<!-- flink basic dependencies-->
2229
<dependency>
2330
<groupId>org.apache.flink</groupId>
2431
<artifactId>flink-java</artifactId>
2532
<version>${flink.version}</version>
33+
<scope>provided</scope>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.apache.flink</groupId>
37+
<artifactId>flink-yarn_2.11</artifactId>
38+
<version>${flink.version}</version>
39+
<scope>provided</scope>
40+
</dependency>
41+
<!--Junit-->
42+
<dependency>
43+
<groupId>junit</groupId>
44+
<artifactId>junit</artifactId>
45+
<version>${junit.version}</version>
46+
<scope>test</scope>
47+
</dependency>
48+
<!--log4j2-->
49+
<dependency>
50+
<groupId>org.slf4j</groupId>
51+
<artifactId>slf4j-api</artifactId>
52+
<version>${slf4j.version}</version>
53+
<scope>provided</scope>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>org.apache.logging.log4j</groupId>
58+
<artifactId>log4j-slf4j-impl</artifactId>
59+
<version>${log4j.version}</version>
60+
<scope>provided</scope>
61+
</dependency>
62+
63+
<dependency>
64+
<groupId>org.apache.logging.log4j</groupId>
65+
<artifactId>log4j-api</artifactId>
66+
<version>${log4j.version}</version>
67+
<scope>provided</scope>
2668
</dependency>
2769
</dependencies>
70+
<build>
71+
<plugins>
72+
<plugin>
73+
<groupId>org.apache.maven.plugins</groupId>
74+
<artifactId>maven-assembly-plugin</artifactId>
75+
<version>2.3</version>
76+
<executions>
77+
<execution>
78+
<id>assemble</id>
79+
<goals>
80+
<goal>single</goal>
81+
</goals>
82+
<!-- install -->
83+
<phase>install</phase>
84+
</execution>
85+
</executions>
86+
<configuration>
87+
<descriptors>
88+
<descriptor>src/main/assembly/package.xml</descriptor>
89+
</descriptors>
90+
<appendAssemblyId>false</appendAssemblyId>
91+
</configuration>
92+
</plugin>
93+
</plugins>
94+
</build>
2895
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
5+
6+
<id>package</id>
7+
8+
<formats>
9+
<format>jar</format>
10+
</formats>
11+
<includeBaseDirectory>false</includeBaseDirectory>
12+
<dependencySets>
13+
<dependencySet>
14+
<outputDirectory>/</outputDirectory>
15+
<unpack>true</unpack>
16+
<scope>runtime</scope>
17+
</dependencySet>
18+
</dependencySets>
19+
</assembly>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;
2+
3+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
4+
import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired;
5+
import org.apache.commons.lang3.StringUtils;
6+
import org.apache.flink.configuration.Configuration;
7+
import org.apache.flink.configuration.GlobalConfiguration;
8+
import org.apache.flink.runtime.util.EnvironmentInformation;
9+
import org.apache.flink.yarn.configuration.YarnConfigOptions;
10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.core.Filter;
12+
import org.apache.logging.log4j.core.filter.LevelMatchFilter;
13+
import org.apache.logging.log4j.core.filter.RegexFilter;
14+
15+
import java.util.Enumeration;
16+
import java.util.List;
17+
import java.util.Properties;
18+
19+
import static com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigDefine.*;
20+
21+
/**
22+
* Autoconfigure the streamis config inf Flink environment
23+
*/
24+
public class FlinkStreamisConfigAutowired implements StreamisConfigAutowired {
25+
26+
/**
27+
* Flink configuration
28+
*/
29+
private Configuration configuration;
30+
31+
public FlinkStreamisConfigAutowired(){
32+
// First to load configuration
33+
// We should sleep and wait for append of the flink-yaml.conf
34+
}
35+
@Override
36+
public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception{
37+
this.configuration = loadConfiguration();
38+
String applicationName =
39+
this.configuration.getString(YarnConfigOptions.APPLICATION_NAME);
40+
if (StringUtils.isNotBlank(applicationName)){
41+
builder.setAppName(applicationName);
42+
}
43+
String gateway = this.configuration.getString(LOG_GATEWAY_ADDRESS);
44+
if (StringUtils.isNotBlank(gateway)){
45+
if (gateway.endsWith("/")){
46+
gateway = gateway.substring(0, gateway.length() - 1);
47+
}
48+
gateway += this.configuration.getString(LOG_COLLECT_PATH, "/");
49+
builder.setRpcAddress(gateway);
50+
}
51+
List<String> filterStrategies = this.configuration.get(LOG_FILTER_STRATEGIES);
52+
for(String filterStrategy : filterStrategies){
53+
if ("LevelMatch".equals(filterStrategy)){
54+
builder.withFilter(LevelMatchFilter.newBuilder().setOnMatch(Filter.Result.ACCEPT).setOnMismatch(Filter.Result.DENY)
55+
.setLevel(Level.getLevel(this.configuration.getString(LOG_FILTER_LEVEL_MATCH))).build());
56+
} else if ("RegexMatch".equals(filterStrategy)){
57+
builder.withFilter(RegexFilter.createFilter( this.configuration.getString(LOG_FILTER_REGEX),
58+
null, true, Filter.Result.ACCEPT, Filter.Result.DENY));
59+
}
60+
}
61+
String hadoopUser = EnvironmentInformation.getHadoopUser();
62+
if (hadoopUser.equals("<no hadoop dependency found>") || hadoopUser.equals("<unknown>")){
63+
hadoopUser = "";
64+
}
65+
return builder.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT))
66+
.setRpcSocketTimeout(this.configuration.getInteger(LOG_RPC_SOCKET_TIMEOUT))
67+
.setRpcSendRetryCnt(this.configuration.getInteger(LOG_RPC_SEND_RETRY_COUNT))
68+
.setRpcServerRecoveryTimeInSec(this.configuration.getInteger(LOG_RPC_SERVER_RECOVERY_TIME))
69+
.setRpcMaxDelayTimeInSec(this.configuration.getInteger(LOG_RPC_MAX_DELAY_TIME))
70+
.setRpcAuthTokenCodeKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE_KEY))
71+
.setRpcAuthTokenUserKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER_KEY))
72+
.setRpcAuthTokenCode(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE))
73+
.setRpcAuthTokenUser(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER,
74+
hadoopUser))
75+
.setRpcCacheSize(this.configuration.getInteger(LOG_RPC_CACHE_SIZE))
76+
.setRpcCacheMaxConsumeThread(this.configuration.getInteger(LOG_PRC_CACHE_MAX_CONSUME_THREAD))
77+
.setRpcBufferSize(this.configuration.getInteger(LOG_RPC_BUFFER_SIZE))
78+
.setRpcBufferExpireTimeInSec(this.configuration.getInteger(LOG_RPC_BUFFER_EXPIRE_TIME)).build();
79+
}
80+
81+
/**
82+
* According to :
83+
* String launchCommand =
84+
* BootstrapTools.getTaskManagerShellCommand(
85+
* flinkConfig,
86+
* tmParams,
87+
* ".",
88+
* ApplicationConstants.LOG_DIR_EXPANSION_VAR,
89+
* hasLogback,
90+
* hasLog4j,
91+
* hasKrb5,
92+
* taskManagerMainClass,
93+
* taskManagerDynamicProperties);
94+
* the configuration directory of Flink yarn container is always ".",
95+
* @return configuration
96+
*/
97+
private synchronized Configuration loadConfiguration(){
98+
// String configDir = System.getenv("FLINK_CONF_DIR");
99+
// if (null == configDir){
100+
// configDir = ".";
101+
// }
102+
String configDir = ".";
103+
Properties properties = System.getProperties();
104+
Enumeration<?> enumeration = properties.propertyNames();
105+
Configuration dynamicConfiguration = new Configuration();
106+
while(enumeration.hasMoreElements()){
107+
String prop = String.valueOf(enumeration.nextElement());
108+
dynamicConfiguration.setString(prop, properties.getProperty(prop));
109+
}
110+
return GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration);
111+
}
112+
113+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;
2+
3+
import org.apache.flink.configuration.ConfigOption;
4+
import org.apache.flink.configuration.ConfigOptions;
5+
6+
import java.util.List;
7+
8+
/**
9+
* Config definition
10+
*/
11+
public class FlinkStreamisConfigDefine {
12+
13+
/**
14+
* Gateway address of log module for streamis
15+
*/
16+
public static final ConfigOption<String> LOG_GATEWAY_ADDRESS = ConfigOptions.key("stream.log.gateway.address")
17+
.stringType().noDefaultValue().withDescription("The gateway address ex: http://127.0.0.1:8080");
18+
19+
/**
20+
* Entrypoint path of collecting log
21+
*/
22+
public static final ConfigOption<String> LOG_COLLECT_PATH = ConfigOptions.key("stream.log.collect.path")
23+
.stringType().defaultValue("/api/rest_j/v1/streamis/streamJobManager/log/collect/events").withDescription("The entrypoint path of collecting log");
24+
25+
/**
26+
* Connection timeout(in milliseconds) in log RPC module
27+
*/
28+
public static final ConfigOption<Integer> LOG_RPC_CONN_TIMEOUT = ConfigOptions.key("stream.log.rpc.connect-timeout")
29+
.intType().defaultValue(3000).withDescription("Connection timeout(ms) in log RPC module");
30+
31+
/**
32+
* Socket timeout(in milliseconds) in log RPC module
33+
*/
34+
public static final ConfigOption<Integer> LOG_RPC_SOCKET_TIMEOUT = ConfigOptions.key("stream.log.rpc.socket-timeout")
35+
.intType().defaultValue(15000).withDescription("Socket timeout(ms) in log RPC module");
36+
37+
/**
38+
* Max retry count of sending message in log RPC module
39+
*/
40+
public static final ConfigOption<Integer> LOG_RPC_SEND_RETRY_COUNT = ConfigOptions.key("stream.log.rpc.send-retry-count")
41+
.intType().defaultValue(3).withDescription("Max retry count of sending message in log RPC module");
42+
43+
/**
44+
* Server recovery time(in seconds) in log RPC module
45+
*/
46+
public static final ConfigOption<Integer> LOG_RPC_SERVER_RECOVERY_TIME = ConfigOptions.key("stream.log.rpc.server-recovery-time-in-sec")
47+
.intType().defaultValue(5).withDescription("Server recovery time(sec) in log RPC module");
48+
49+
/**
50+
* Max delay time(in seconds) in log RPC module. if reach the limit, the message will be dropped
51+
*/
52+
public static final ConfigOption<Integer> LOG_RPC_MAX_DELAY_TIME = ConfigOptions.key("stream.log.rpc.max-delay-time")
53+
.intType().defaultValue(60).withDescription("Max delay time(sec) in log RPC module");
54+
55+
/**
56+
* Token code key in log RPC auth module
57+
*/
58+
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_CODE_KEY = ConfigOptions.key("stream.log.rpc.auth.token-code-key")
59+
.stringType().defaultValue("Token-Code").withDescription("Token code key in log RPC auth module");
60+
61+
/**
62+
* Token user key in log RPC auth module
63+
*/
64+
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_USER_KEY = ConfigOptions.key("stream.log.rpc.auth.token-user-key")
65+
.stringType().defaultValue("Token-User").withDescription("Token user key in log RPC auth module");
66+
67+
/**
68+
* Token code in log RPC auth module
69+
*/
70+
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_CODE = ConfigOptions.key("stream.log.rpc.auth.token-code")
71+
.stringType().defaultValue("STREAM-LOG").withDescription("Token code in log RPC auth module");
72+
73+
/**
74+
* Token user in log RPC auth module
75+
*/
76+
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_USER = ConfigOptions.key("stream.log.rpc.auth.token-user")
77+
.stringType().defaultValue(System.getProperty("user.name")).withDescription("Token user in log RPC auth module");
78+
79+
/**
80+
* Cache size in log RPC module
81+
*/
82+
public static final ConfigOption<Integer> LOG_RPC_CACHE_SIZE = ConfigOptions.key("stream.log.rpc.cache.size")
83+
.intType().defaultValue(150).withDescription("Cache size in log RPC module");
84+
85+
/**
86+
* Max cache consume threads in log RPC module
87+
*/
88+
public static final ConfigOption<Integer> LOG_PRC_CACHE_MAX_CONSUME_THREAD = ConfigOptions.key("stream.log.rpc.cache.max-consume-thread")
89+
.intType().defaultValue(10).withDescription("Max cache consume threads in log RPC module");
90+
91+
/**
92+
* Buffer size in log RPC module
93+
*/
94+
public static final ConfigOption<Integer> LOG_RPC_BUFFER_SIZE = ConfigOptions.key("stream.log.rpc.buffer.size")
95+
.intType().defaultValue(50).withDescription("Buffer size in log RPC module");
96+
97+
/**
98+
* Buffer expire time(sec) in log RPC module
99+
*/
100+
public static final ConfigOption<Integer> LOG_RPC_BUFFER_EXPIRE_TIME = ConfigOptions.key("stream.log.rpc.buffer.expire-time-in-sec")
101+
.intType().defaultValue(2).withDescription("Buffer expire time (sec) in log RPC module");
102+
103+
/**
104+
* Log filter strategy list
105+
*/
106+
public static final ConfigOption<List<String>> LOG_FILTER_STRATEGIES = ConfigOptions.key("stream.log.filter.strategies")
107+
.stringType().asList().defaultValues("LevelMatch").withDescription("Log filter strategy list");
108+
109+
/**
110+
* Level value of LevelMatch filter strategy
111+
*/
112+
public static final ConfigOption<String> LOG_FILTER_LEVEL_MATCH = ConfigOptions.key("stream.log.filter.level-match.level")
113+
.stringType().defaultValue("ERROR").withDescription("Level value of LevelMatch filter strategy");
114+
115+
/**
116+
* Regex value of RegexMatch filter strategy
117+
*/
118+
public static final ConfigOption<String> LOG_FILTER_REGEX = ConfigOptions.key("stream.log.filter.regex.value")
119+
.stringType().defaultValue(".*").withDescription("Regex value of RegexMatch filter strategy");
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigAutowired
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.configuration.GlobalConfiguration;
5+
import org.junit.Test;
6+
7+
import java.io.File;
8+
import java.util.Enumeration;
9+
import java.util.Objects;
10+
import java.util.Properties;
11+
12+
public class FlinkConfigurationLoadTest {
13+
@Test
14+
public void loadConfiguration() {
15+
String configDir = Objects.requireNonNull(FlinkConfigurationLoadTest.class.getResource("/")).getFile();
16+
Properties properties = System.getProperties();
17+
Enumeration<?> enumeration = properties.propertyNames();
18+
Configuration dynamicConfiguration = new Configuration();
19+
while(enumeration.hasMoreElements()){
20+
String prop = String.valueOf(enumeration.nextElement());
21+
dynamicConfiguration.setString(prop, properties.getProperty(prop));
22+
}
23+
GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration);
24+
}
25+
}

streamis-jobmanager/streamis-job-log/flink-streamis-log-collector/src/test/resources/flink-conf.yaml

Whitespace-only changes.

0 commit comments

Comments
 (0)