Skip to content

Commit 03d8f89

Browse files
Merge pull request #391 from wushengyeyouya/dev-1.0.0
Update the plugins of DSS and README files.
2 parents 37a1f9e + fb87275 commit 03d8f89

25 files changed

+596
-676
lines changed

README-ZH.md

Lines changed: 73 additions & 111 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,75 +33,77 @@ Please be patient, it will take some time to load gif.
3333

3434
       c. [Qualitis](https://github.com/WeBankFinTech/Qualitis) - Data Quality Management Tool
3535

36-
       d. [Azkaban](https://azkaban.github.io/) - Batch workflow job scheduler
36+
       d. [Schedulis](https://github.com/WeBankFinTech/Schedulis) - Batch workflow job scheduler
37+
38+
       f. [Exchangis](https://github.com/WeBankFinTech/Exchangis) - Data Exchange Tool
3739

3840
![DSS one-stop video](images/en_US/readme/onestop.gif)
3941

40-
### 2. AppJoint, based on Linkis,defines a unique design concept
42+
### 2. AppConn, based on Linkis,defines a unique design concept
4143

42-
       AppJoint——application joint, defining unified front-end and back-end
44+
       AppConn——application connector, defining unified front-end and back-end
4345
integration specifications, can quickly and easily integrate with external data application systems,
4446
making them as part of DSS data application development.
4547

46-
       DSS arranges multiple AppJoints in series to form a workflow that supports real-time execution and scheduled execution. Users can complete the entire process development of data applications with simple drag and drop operations.
48+
       DSS arranges multiple AppConns in series to form a workflow that supports real-time execution and scheduled execution. Users can complete the entire process development of data applications with simple drag and drop operations.
4749

48-
       Since AppJoint is integrated with Linkis, the external data application system shares the capabilities of resource management, concurrent limiting, and high performance. AppJoint also allows sharable context across system level and completely gets away from application silos.
50+
       Since AppConn is integrated with Linkis, the external data application system shares the capabilities of resource management, concurrent limiting, and high performance. AppConn also allows sharable context across system level and completely gets away from application silos.
4951

5052
### 3. Project, as the management unit
5153

5254
       With Project as the management unit, DSS organizes and manages the business applications of each data application system, and defines a set of common standards for collaborative development of projects across data application systems.
5355

5456
### 4. Integrated data application components
5557

56-
      a. Azkaban AppJoint —— Batch workflow job scheduler
58+
      a. Schedulis AppConn —— Batch workflow job scheduler
5759

5860
         Many data applications developed by users usually require periodic scheduling capability.
5961

6062
         At present, the open source scheduling system in the community is pretty unfriendly to integrate with other data application systems.
6163

62-
         DSS implements Azkaban AppJoint, which allows users to publish DSS workflows to Azkaban for regular scheduling.
64+
         DSS implements Schedulis AppConn, which allows users to publish DSS workflows to Azkaban for regular scheduling.
6365

6466
         DSS also defines standard and generic workflow parsing and publishing specifications for scheduling systems, allowing other scheduling systems to easily achieve low-cost integration with DSS.
6567

6668
![Azkaban](images/en_US/readme/Azkaban_AppJoint.gif)
6769

68-
      b. Scriptis AppJoint —— Data Development IDE Tool
70+
      b. Scriptis AppConn —— Data Development IDE Tool
6971

7072
         What is [Scriptis](https://github.com/WeBankFinTech/Scriptis)?
7173

7274
         Scriptis is for interactive data analysis with script development(SQL, Pyspark, HiveQL), task submission(Spark, Hive), UDF, function, resource management and intelligent diagnosis.
7375

74-
         Scriptis AppJoint integrates the data development capabilities of Scriptis to DSS, and allows various script types of Scriptis to serve as nodes in the DSS workflow to participate in the application development process.
76+
         Scriptis AppConn integrates the data development capabilities of Scriptis to DSS, and allows various script types of Scriptis to serve as nodes in the DSS workflow to participate in the application development process.
7577

7678
         Currently supports HiveSQL, SparkSQL, Pyspark, Scala and other script node types.
7779

7880
![Scriptis](images/en_US/readme/Scriptis_AppJoint.gif)
7981

80-
      c. Visualis AppJoint —— Data Visualization Tool
82+
      c. Visualis AppConn —— Data Visualization Tool
8183

8284
         What is [Visualis](https://github.com/WeBankFinTech/Visualis)?
8385

8486
         Visualis is a BI tool for data visualization. It provides financial-grade data visualization capabilities on the basis of data security and permissions, based on the open source project Davinci contributed by CreditEase.
8587

86-
         Visualis AppJoint integrates data visualization capabilities to DSS, and allows displays and dashboards, as nodes of DSS workflows, to be associated with upstream data market.
88+
         Visualis AppConn integrates data visualization capabilities to DSS, and allows displays and dashboards, as nodes of DSS workflows, to be associated with upstream data market.
8789

8890
![Visualis](images/en_US/readme/Visualis_AppJoint.gif)
8991

90-
      d. Qualitis AppJoint —— Data quality management Tool
92+
      d. Qualitis AppConn —— Data quality management Tool
9193

92-
         Qualitis AppJoint integrates data quality verification capabilities for DSS, allows Qualitis as a node in DSS workflow
94+
         Qualitis AppConn integrates data quality verification capabilities for DSS, allows Qualitis as a node in DSS workflow
9395

9496
![Qualitis](images/en_US/readme/Qualitis_AppJoint.gif)
9597

96-
      e. Data Sender——Sender AppJoint
98+
      e. Data Sender——Sender AppConn
9799

98-
         Sender AppJoint provides data delivery capability for DSS. Currently it supports the SendEmail node type, and the result sets of all other nodes can be sent via email.
100+
         Sender AppConn provides data delivery capability for DSS. Currently it supports the SendEmail node type, and the result sets of all other nodes can be sent via email.
99101

100102
         For example, the SendEmail node can directly send the screen shot of a display as an email.
101103

102-
      f. Signal AppJoint —— Signal Nodes
104+
      f. Signal AppConn —— Signal Nodes
103105

104-
         Signal AppJoint is used to strengthen the correlation between business and process while keeping them decoupled.
106+
         Signal AppConn is used to strengthen the correlation between business and process while keeping them decoupled.
105107

106108
         DataChecker Node:Checks whether a table or partition exists.
107109

plugins/azkaban/linkis-jobtype/bin/install.sh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@ fi
3838

3939
echo "start to subsitution conf"
4040
sed -i "s#jobtype.lib.dir.*#jobtype.lib.dir=$AZKABAN_JOBTYPE_DIR/linkis/lib#g" ${workDir}/private.properties
41-
sed -i "s#wds.linkis.gateway.url.*#wds.linkis.gateway.url=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
41+
sed -i "s#wds.linkis.gateway.url.v0.*#wds.linkis.gateway.url.v0=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
42+
sed -i "s#wds.linkis.gateway.url.v1.*#wds.linkis.gateway.url.v1=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
4243
sed -i "s#wds.linkis.client.flow.author.user.token.*#wds.linkis.client.flow.author.user.token=$LINKIS_GATEWAY_TOKEN#g" ${workDir}/plugin.properties
4344
isSuccess "subsitution conf"
4445

4546
echo "$COPY Plugin"
46-
ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"
47+
##ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"
4748

4849
scp -P $SSH_PORT -r ${workDir} $AZKABAN_EXECUTOR_HOST:$AZKABAN_JOBTYPE_DIR
4950

5051
echo "reload jobType"
5152

52-
curl $AZKABAN_EXECUTOR_URL
53+
##curl $AZKABAN_EXECUTOR_URL

plugins/azkaban/linkis-jobtype/pom.xml

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
33
~ Copyright 2019 WeBank
4-
~
54
~ Licensed under the Apache License, Version 2.0 (the "License");
6-
~ you may not use this file except in compliance with the License.
5+
~ you may not use this file except in compliance with the License.
76
~ You may obtain a copy of the License at
87
~
98
~ http://www.apache.org/licenses/LICENSE-2.0
@@ -23,12 +22,11 @@
2322
<parent>
2423
<artifactId>dss</artifactId>
2524
<groupId>com.webank.wedatasphere.dss</groupId>
26-
<version>0.9.1</version>
25+
<version>1.0.0</version>
2726
</parent>
28-
<groupId>com.webank.wedatasphere.dss</groupId>
2927
<artifactId>linkis-jobtype</artifactId>
3028
<properties>
31-
<azkaban.version>2.5.0</azkaban.version>
29+
<azkaban.version>0.6.1</azkaban.version>
3230
</properties>
3331

3432
<dependencies>
@@ -40,9 +38,19 @@
4038
</dependency>
4139

4240
<dependency>
43-
<groupId>com.linkedin.azkaban</groupId>
44-
<artifactId>azkaban</artifactId>
41+
<groupId>com.webank.wedatasphere.schedulis</groupId>
42+
<artifactId>azkaban-common</artifactId>
4543
<version>${azkaban.version}</version>
44+
<exclusions>
45+
<exclusion>
46+
<groupId>com.webank.azkaban</groupId>
47+
<artifactId>azkaban-spi</artifactId>
48+
</exclusion>
49+
<exclusion>
50+
<groupId>com.webank.azkaban</groupId>
51+
<artifactId>azkaban-db</artifactId>
52+
</exclusion>
53+
</exclusions>
4654
<scope>provided</scope>
4755
</dependency>
4856

@@ -77,6 +85,11 @@
7785
</plugin>
7886
<plugin>
7987
<artifactId>maven-assembly-plugin</artifactId>
88+
<configuration>
89+
<descriptorRefs>
90+
<descriptorRef>jar-with-dependencies</descriptorRef>
91+
</descriptorRefs>
92+
</configuration>
8093
<executions>
8194
<execution>
8295
<id>package</id>

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/AzkabanDssJobType.java

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/*
22
* Copyright 2019 WeBank
3-
*
43
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
4+
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
76
*
87
* http://www.apache.org/licenses/LICENSE-2.0
@@ -22,24 +21,24 @@
2221
import azkaban.utils.Props;
2322
import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration;
2423
import com.webank.wedatasphere.dss.linkis.node.execution.execution.impl.LinkisNodeExecutionImpl;
25-
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2624
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
2725
import com.webank.wedatasphere.dss.linkis.node.execution.job.JobTypeEnum;
26+
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2827
import com.webank.wedatasphere.dss.linkis.node.execution.listener.LinkisExecutionListener;
2928
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.job.JobBuilder;
30-
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppjointLog;
31-
import org.apache.log4j.Logger;
32-
import java.util.Map;
29+
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppConnLog;
30+
import org.apache.commons.lang.StringUtils;
31+
import org.slf4j.Logger;
3332

33+
import java.text.ParseException;
34+
import java.text.SimpleDateFormat;
35+
import java.util.Date;
36+
import java.util.Map;
3437

3538

36-
/**
37-
* Created by peacewong on 2019/9/19.
38-
*/
3939
public class AzkabanDssJobType extends AbstractJob {
4040

4141

42-
4342
private static final String SENSITIVE_JOB_PROP_NAME_SUFFIX = "_X";
4443
private static final String SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER = "[MASKED]";
4544
private static final String JOB_DUMP_PROPERTIES_IN_LOG = "job.dump.properties";
@@ -82,23 +81,51 @@ public AzkabanDssJobType(String jobId, Props sysProps, Props jobProps, Logger lo
8281

8382
}
8483

84+
8585
@Override
8686
public void run() throws Exception {
8787

8888
info("Start to execute job");
8989
logJobProperties();
90+
String runDate = getRunDate();
91+
if (StringUtils.isNotBlank(runDate)){
92+
this.jobPropsMap.put("run_date", runDate);
93+
}
9094
this.job = JobBuilder.getAzkanbanBuilder().setJobProps(this.jobPropsMap).build();
91-
this.job.setLogObj(new AzkabanAppjointLog(this.log));
95+
this.job.setLogObj(new AzkabanAppConnLog(this.log));
9296
if(JobTypeEnum.EmptyJob == ((LinkisJob)this.job).getJobType()){
9397
this.log.warn("This node is empty type");
9498
return;
9599
}
100+
// info("runtimeMap is " + job.getRuntimeParams());
101+
//job.getRuntimeParams().put("workspace", getWorkspace(job.getUser()));
102+
info("runtimeMap is " + job.getRuntimeParams());
96103
LinkisNodeExecutionImpl.getLinkisNodeExecution().runJob(this.job);
97-
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);
104+
105+
try {
106+
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);
107+
} catch (Exception e) {
108+
this.log.warn("Failed to execute job", e);
109+
//String reason = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
110+
//this.log.error("Reason for failure: " + reason);
111+
throw e;
112+
}
113+
try {
114+
String endLog = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
115+
this.log.info(endLog);
116+
} catch (Throwable e){
117+
this.log.info("Failed to get log", e);
118+
}
98119

99120
LinkisExecutionListener listener = (LinkisExecutionListener)LinkisNodeExecutionImpl.getLinkisNodeExecution();
100121
listener.onStatusChanged(null, LinkisNodeExecutionImpl.getLinkisNodeExecution().getState(this.job),this.job);
101-
int resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
122+
int resultSize = 0;
123+
try{
124+
resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
125+
}catch(final Throwable t){
126+
this.log.error("failed to get result size");
127+
resultSize = -1;
128+
}
102129
for(int i =0; i < resultSize; i++){
103130
this.log.info("The content of the " + (i + 1) + "th resultset is :"
104131
+ LinkisNodeExecutionImpl.getLinkisNodeExecution().getResult(this.job, i, LinkisJobExecutionConfiguration.RESULT_PRINT_SIZE.getValue(this.jobPropsMap)));
@@ -109,7 +136,7 @@ public void run() throws Exception {
109136

110137
@Override
111138
public void cancel() throws Exception {
112-
super.cancel();
139+
//super.cancel();
113140
LinkisNodeExecutionImpl.getLinkisNodeExecution().cancel(this.job);
114141
isCanceled = true;
115142
warn("This job has been canceled");
@@ -149,4 +176,37 @@ private void logJobProperties() {
149176
}
150177
}
151178

179+
private String getRunDate(){
180+
this.info("begin to get run date");
181+
if (this.jobProps != null &&
182+
this.jobProps.getBoolean(JOB_DUMP_PROPERTIES_IN_LOG, true)) {
183+
try {
184+
for (final Map.Entry<String, String> entry : this.jobPropsMap.entrySet()) {
185+
final String key = entry.getKey();
186+
final String value = key.endsWith(SENSITIVE_JOB_PROP_NAME_SUFFIX) ?
187+
SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER :
188+
entry.getValue();
189+
if ("azkaban.flow.start.timestamp".equals(key)){
190+
this.info("run time is " + value);
191+
String runDateNow = value.substring(0, 10).replaceAll("-", "");
192+
this.info("run date now is " + runDateNow);
193+
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
194+
try {
195+
Date date = simpleDateFormat.parse(runDateNow);
196+
//因为date已经当天的00:00:00 减掉12小时 就是昨天的时间
197+
String runDate = simpleDateFormat.format(new Date(date.getTime() - 24 * 60 * 60 * 1000));
198+
this.info("runDate is " + runDate);
199+
return runDate;
200+
} catch (ParseException e) {
201+
this.log.error("failed to parse run date " + runDateNow, e);
202+
}
203+
}
204+
}
205+
} catch (final Exception ex) {
206+
this.log.error("failed to log job properties ", ex);
207+
}
208+
}
209+
return null;
210+
}
211+
152212
}

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/conf/LinkisJobTypeConf.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/*
22
* Copyright 2019 WeBank
3-
*
43
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
4+
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
76
*
87
* http://www.apache.org/licenses/LICENSE-2.0
@@ -19,15 +18,15 @@
1918

2019
import com.webank.wedatasphere.linkis.common.conf.CommonVars;
2120

22-
/**
23-
* Created by peacewong on 2019/11/3.
24-
*/
21+
2522
public class LinkisJobTypeConf {
2623

2724
public static final String COMMAND = "command";
2825

2926
public static final String JOB_ID = "azkaban.job.id";
3027

28+
public static final String DSS_LABELS_KEY = "labels";
29+
3130
public static final String FLOW_NAME = "azkaban.flow.flowid";
3231

3332
public static final String PROJECT_ID = "azkaban.flow.projectid";
@@ -41,12 +40,8 @@ public class LinkisJobTypeConf {
4140

4241
public static final String FLOW_SUBMIT_USER = "azkaban.flow.submituser";
4342

44-
public static final String READ_NODE_TOKEN = "read.nodes";
45-
46-
public static final String SHARED_NODE_TOKEN = "share.num";
47-
4843
public static final String MSG_SAVE_KEY = "msg.savekey";
4944

50-
public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appjoint.eventchecker.eventreceiver");
45+
public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appconn.eventchecker.eventreceiver");
5146

5247
}

0 commit comments

Comments
 (0)