Skip to content

Commit 154abdc

Browse files
committed
Add module to collect spark container log.
1 parent 97ba00f commit 154abdc

File tree

8 files changed

+161
-23
lines changed

8 files changed

+161
-23
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>streamis-job-log</artifactId>
7+
<groupId>com.webank.wedatasphere.streamis</groupId>
8+
<version>0.2.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>xspark-streamis-log-collector</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>8</maven.compiler.source>
16+
<maven.compiler.target>8</maven.compiler.target>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.webank.wedatasphere.streamis</groupId>
22+
<artifactId>streamis-job-log-collector1x</artifactId>
23+
<version>${streamis.version}</version>
24+
</dependency>
25+
</dependencies>
26+
<build>
27+
<plugins>
28+
<plugin>
29+
<groupId>org.apache.maven.plugins</groupId>
30+
<artifactId>maven-assembly-plugin</artifactId>
31+
<version>2.3</version>
32+
<executions>
33+
<execution>
34+
<id>assemble</id>
35+
<goals>
36+
<goal>single</goal>
37+
</goals>
38+
<!-- install -->
39+
<phase>install</phase>
40+
</execution>
41+
</executions>
42+
<configuration>
43+
<descriptors>
44+
<descriptor>src/main/assembly/package.xml</descriptor>
45+
</descriptors>
46+
<appendAssemblyId>false</appendAssemblyId>
47+
</configuration>
48+
</plugin>
49+
</plugins>
50+
</build>
51+
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
5+
6+
<id>package</id>
7+
8+
<formats>
9+
<format>jar</format>
10+
</formats>
11+
<includeBaseDirectory>false</includeBaseDirectory>
12+
<dependencySets>
13+
<dependencySet>
14+
<outputDirectory>/</outputDirectory>
15+
<unpack>true</unpack>
16+
<scope>runtime</scope>
17+
</dependencySet>
18+
</dependencySets>
19+
</assembly>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector.spark;
2+
3+
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
4+
import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired;
5+
6+
import java.util.Optional;
7+
8+
/**
9+
* Autoconfigure the streamis config in Spark environment
10+
*/
11+
public class SparkStreamisConfigAutowired implements StreamisConfigAutowired {
12+
13+
private static final String APP_NAME_CONFIG = "app.name";
14+
15+
private static final String SERVER_ADDRESS_CONFIG = "streamis.url";
16+
17+
private static final String COLLECTOR_URI_CONFIG = "streamis.log.collector.uri";
18+
19+
private static final String PROJECT_NAME_CONFIG = "project.name";
20+
21+
private static final String DEFAULT_COLLECTOR_URI = "/api/rest_j/v1/streamis/streamJobManager/log/collect/events";
22+
@Override
23+
public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception {
24+
// Load the config from system properties
25+
Optional.ofNullable(System.getProperty(APP_NAME_CONFIG)).ifPresent(appName -> {
26+
String projectName = System.getProperty(PROJECT_NAME_CONFIG);
27+
if (null != projectName && !projectName.trim().equals("")){
28+
appName = projectName + "." + appName;
29+
}
30+
System.out.println("Spark env to streamis: application name =>" + appName);
31+
builder.setAppName(appName);
32+
});
33+
String serverAddress = System.getProperty(SERVER_ADDRESS_CONFIG);
34+
if (null != serverAddress && !serverAddress.trim().equals("")){
35+
if (serverAddress.endsWith("/")){
36+
serverAddress = serverAddress.substring(0, serverAddress.length() - 1);
37+
}
38+
String collectorUri = System.getProperty(COLLECTOR_URI_CONFIG, DEFAULT_COLLECTOR_URI);
39+
if (null != collectorUri && !collectorUri.trim().equals("")){
40+
if (!collectorUri.startsWith("/")){
41+
collectorUri = "/" + collectorUri;
42+
}
43+
serverAddress += collectorUri;
44+
}
45+
System.out.println("Spark env to streamis: server address =>" + serverAddress);
46+
builder.setRpcAddress(serverAddress);
47+
}
48+
String user = System.getenv("USER");
49+
if (null == user || user.trim().equals("")){
50+
user = System.getProperty("user.name", "hadoop");
51+
}
52+
System.out.println("Spark env to streamis: log user =>" + user);
53+
builder.setRpcAuthTokenUser(user);
54+
return builder.build();
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.webank.wedatasphere.streamis.jobmanager.log.collector.spark.SparkStreamisConfigAutowired

streamis-jobmanager/streamis-job-log/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
<module>job-log-collector/streamis-job-log-collector</module>
1717
<module>job-log-collector/streamis-job-log-collector1x</module>
1818
<module>job-log-collector/flink-streamis-log-collector</module>
19+
<module>job-log-collector/xspark-streamis-log-collector</module>
1920
<module>streamis-job-log-server</module>
2021
<module>streamis-job-log-common</module>
2122
</modules>

streamis-jobmanager/streamis-job-log/streamis-job-log-common/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/entities/StreamisLogEvents.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,5 @@ private String joinEvents(StreamisLogEvent[] events, String separator){
108108
}
109109
return builder.toString();
110110
}
111+
111112
}

streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/restful/JobLogRestfulApi.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.webank.wedatasphere.streamis.jobmanager.log.server.restful;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent;
35
import com.webank.wedatasphere.streamis.jobmanager.log.server.entities.StreamisLogEvents;
46
import com.webank.wedatasphere.streamis.jobmanager.log.server.exception.StreamJobLogException;
57
import com.webank.wedatasphere.streamis.jobmanager.log.server.service.StreamisJobLogService;
68
import org.apache.commons.lang.StringUtils;
9+
import org.apache.linkis.common.utils.JsonUtils;
710
import org.apache.linkis.server.Message;
811
import org.apache.linkis.server.security.SecurityFilter;
912
import org.slf4j.Logger;
@@ -15,6 +18,7 @@
1518

1619
import javax.annotation.Resource;
1720
import javax.servlet.http.HttpServletRequest;
21+
import java.io.IOException;
1822

1923
@RestController
2024
@RequestMapping(path = "/streamis/streamJobManager/log")

streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/StreamisJobLogStorage.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,35 @@ public JobLogBucket getOrCreateBucket(String userName, String appName, JobLogBuc
7979
public synchronized void init() {
8080
if (Objects.isNull(monitorThread)){
8181
monitorThread = Utils.defaultScheduler().scheduleAtFixedRate(() -> {
82-
Thread.currentThread().setName(StreamJobLogConfig.BUCKET_MONITOR_NAME.getValue());
83-
long maxIdleTime = StreamJobLogConfig.BUCKET_MAX_IDLE_TIME.getValue().toLong();
84-
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
85-
if (buckets.size() > 0) {
86-
StringBuilder builder = new StringBuilder("Buckets in LogStorage: [\n");
87-
buckets.forEach((bucketName, bucket) -> {
88-
JobLogBucketState bucketState = bucket.getBucketState();
89-
builder.append("bucket: [ name: ")
90-
.append(bucketName)
91-
.append(", path: ").append(bucketState.getBucketPath())
92-
.append(", parts: ").append(bucketState.getBucketParts())
93-
.append(", write-rate: ").append(bucketState.getBucketWriteRate()).append("/s")
94-
.append(", last-write-time: ").append(dateFormat.format(bucketState.getBucketWriteTime()))
95-
.append(" ]\n");
96-
if (bucketState.getBucketWriteTime() + maxIdleTime <= System.currentTimeMillis()){
97-
LOG.info("Close the idle bucket: [ name: {}, last-write-time: {} ]",
98-
bucketName, dateFormat.format(bucketState.getBucketWriteTime()));
99-
bucket.close();
100-
// Delete the bucket
101-
buckets.remove(bucketName);
102-
}
82+
String threadName = Thread.currentThread().getName();
83+
try {
84+
Thread.currentThread().setName(StreamJobLogConfig.BUCKET_MONITOR_NAME.getValue());
85+
long maxIdleTime = StreamJobLogConfig.BUCKET_MAX_IDLE_TIME.getValue().toLong();
86+
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
87+
if (buckets.size() > 0) {
88+
StringBuilder builder = new StringBuilder("Buckets in LogStorage: [\n");
89+
buckets.forEach((bucketName, bucket) -> {
90+
JobLogBucketState bucketState = bucket.getBucketState();
91+
builder.append("bucket: [ name: ")
92+
.append(bucketName)
93+
.append(", path: ").append(bucketState.getBucketPath())
94+
.append(", parts: ").append(bucketState.getBucketParts())
95+
.append(", write-rate: ").append(bucketState.getBucketWriteRate()).append("/s")
96+
.append(", last-write-time: ").append(dateFormat.format(bucketState.getBucketWriteTime()))
97+
.append(" ]\n");
98+
if (bucketState.getBucketWriteTime() + maxIdleTime <= System.currentTimeMillis()) {
99+
LOG.info("Close the idle bucket: [ name: {}, last-write-time: {} ]",
100+
bucketName, dateFormat.format(bucketState.getBucketWriteTime()));
101+
bucket.close();
102+
// Delete the bucket
103+
buckets.remove(bucketName);
104+
}
103105

104-
});
105-
LOG.info(builder.toString());
106+
});
107+
LOG.info(builder.toString());
108+
}
109+
} finally {
110+
Thread.currentThread().setName(threadName);
106111
}
107112

108113
},BUCKET_MONITOR_INTERVAL.getValue().toLong(), BUCKET_MONITOR_INTERVAL.getValue().toLong(), TimeUnit.MILLISECONDS);

0 commit comments

Comments
 (0)