Skip to content

Commit ad2ac58

Browse files
Update the plugins of DSS.
1 parent 301b8cb commit ad2ac58

23 files changed

+504
-548
lines changed

plugins/azkaban/linkis-jobtype/bin/install.sh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@ fi
3838

3939
echo "start to subsitution conf"
4040
sed -i "s#jobtype.lib.dir.*#jobtype.lib.dir=$AZKABAN_JOBTYPE_DIR/linkis/lib#g" ${workDir}/private.properties
41-
sed -i "s#wds.linkis.gateway.url.*#wds.linkis.gateway.url=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
41+
sed -i "s#wds.linkis.gateway.url.v0.*#wds.linkis.gateway.url.v0=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
42+
sed -i "s#wds.linkis.gateway.url.v1.*#wds.linkis.gateway.url.v1=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
4243
sed -i "s#wds.linkis.client.flow.author.user.token.*#wds.linkis.client.flow.author.user.token=$LINKIS_GATEWAY_TOKEN#g" ${workDir}/plugin.properties
4344
isSuccess "subsitution conf"
4445

4546
echo "$COPY Plugin"
46-
ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"
47+
##ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"
4748

4849
scp -P $SSH_PORT -r ${workDir} $AZKABAN_EXECUTOR_HOST:$AZKABAN_JOBTYPE_DIR
4950

5051
echo "reload jobType"
5152

52-
curl $AZKABAN_EXECUTOR_URL
53+
##curl $AZKABAN_EXECUTOR_URL

plugins/azkaban/linkis-jobtype/pom.xml

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
33
~ Copyright 2019 WeBank
4-
~
54
~ Licensed under the Apache License, Version 2.0 (the "License");
6-
~ you may not use this file except in compliance with the License.
5+
~ you may not use this file except in compliance with the License.
76
~ You may obtain a copy of the License at
87
~
98
~ http://www.apache.org/licenses/LICENSE-2.0
@@ -23,12 +22,11 @@
2322
<parent>
2423
<artifactId>dss</artifactId>
2524
<groupId>com.webank.wedatasphere.dss</groupId>
26-
<version>0.9.1</version>
25+
<version>1.0.0</version>
2726
</parent>
28-
<groupId>com.webank.wedatasphere.dss</groupId>
2927
<artifactId>linkis-jobtype</artifactId>
3028
<properties>
31-
<azkaban.version>2.5.0</azkaban.version>
29+
<azkaban.version>0.6.0</azkaban.version>
3230
</properties>
3331

3432
<dependencies>
@@ -40,9 +38,19 @@
4038
</dependency>
4139

4240
<dependency>
43-
<groupId>com.linkedin.azkaban</groupId>
44-
<artifactId>azkaban</artifactId>
41+
<groupId>com.webank.wedatasphere.schedulis</groupId>
42+
<artifactId>azkaban-common</artifactId>
4543
<version>${azkaban.version}</version>
44+
<exclusions>
45+
<exclusion>
46+
<groupId>com.webank.azkaban</groupId>
47+
<artifactId>azkaban-spi</artifactId>
48+
</exclusion>
49+
<exclusion>
50+
<groupId>com.webank.azkaban</groupId>
51+
<artifactId>azkaban-db</artifactId>
52+
</exclusion>
53+
</exclusions>
4654
<scope>provided</scope>
4755
</dependency>
4856

@@ -77,6 +85,11 @@
7785
</plugin>
7886
<plugin>
7987
<artifactId>maven-assembly-plugin</artifactId>
88+
<configuration>
89+
<descriptorRefs>
90+
<descriptorRef>jar-with-dependencies</descriptorRef>
91+
</descriptorRefs>
92+
</configuration>
8093
<executions>
8194
<execution>
8295
<id>package</id>

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/AzkabanDssJobType.java

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/*
22
* Copyright 2019 WeBank
3-
*
43
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
4+
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
76
*
87
* http://www.apache.org/licenses/LICENSE-2.0
@@ -22,24 +21,24 @@
2221
import azkaban.utils.Props;
2322
import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration;
2423
import com.webank.wedatasphere.dss.linkis.node.execution.execution.impl.LinkisNodeExecutionImpl;
25-
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2624
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
2725
import com.webank.wedatasphere.dss.linkis.node.execution.job.JobTypeEnum;
26+
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2827
import com.webank.wedatasphere.dss.linkis.node.execution.listener.LinkisExecutionListener;
2928
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.job.JobBuilder;
30-
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppjointLog;
31-
import org.apache.log4j.Logger;
32-
import java.util.Map;
29+
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppConnLog;
30+
import org.apache.commons.lang.StringUtils;
31+
import org.slf4j.Logger;
3332

33+
import java.text.ParseException;
34+
import java.text.SimpleDateFormat;
35+
import java.util.Date;
36+
import java.util.Map;
3437

3538

36-
/**
37-
* Created by peacewong on 2019/9/19.
38-
*/
3939
public class AzkabanDssJobType extends AbstractJob {
4040

4141

42-
4342
private static final String SENSITIVE_JOB_PROP_NAME_SUFFIX = "_X";
4443
private static final String SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER = "[MASKED]";
4544
private static final String JOB_DUMP_PROPERTIES_IN_LOG = "job.dump.properties";
@@ -82,23 +81,51 @@ public AzkabanDssJobType(String jobId, Props sysProps, Props jobProps, Logger lo
8281

8382
}
8483

84+
8585
@Override
8686
public void run() throws Exception {
8787

8888
info("Start to execute job");
8989
logJobProperties();
90+
String runDate = getRunDate();
91+
if (StringUtils.isNotBlank(runDate)){
92+
this.jobPropsMap.put("run_date", runDate);
93+
}
9094
this.job = JobBuilder.getAzkanbanBuilder().setJobProps(this.jobPropsMap).build();
91-
this.job.setLogObj(new AzkabanAppjointLog(this.log));
95+
this.job.setLogObj(new AzkabanAppConnLog(this.log));
9296
if(JobTypeEnum.EmptyJob == ((LinkisJob)this.job).getJobType()){
9397
this.log.warn("This node is empty type");
9498
return;
9599
}
100+
// info("runtimeMap is " + job.getRuntimeParams());
101+
//job.getRuntimeParams().put("workspace", getWorkspace(job.getUser()));
102+
info("runtimeMap is " + job.getRuntimeParams());
96103
LinkisNodeExecutionImpl.getLinkisNodeExecution().runJob(this.job);
97-
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);
104+
105+
try {
106+
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);
107+
} catch (Exception e) {
108+
this.log.warn("Failed to execute job", e);
109+
//String reason = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
110+
//this.log.error("Reason for failure: " + reason);
111+
throw e;
112+
}
113+
try {
114+
String endLog = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
115+
this.log.info(endLog);
116+
} catch (Throwable e){
117+
this.log.info("Failed to get log", e);
118+
}
98119

99120
LinkisExecutionListener listener = (LinkisExecutionListener)LinkisNodeExecutionImpl.getLinkisNodeExecution();
100121
listener.onStatusChanged(null, LinkisNodeExecutionImpl.getLinkisNodeExecution().getState(this.job),this.job);
101-
int resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
122+
int resultSize = 0;
123+
try{
124+
resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
125+
}catch(final Throwable t){
126+
this.log.error("failed to get result size");
127+
resultSize = -1;
128+
}
102129
for(int i =0; i < resultSize; i++){
103130
this.log.info("The content of the " + (i + 1) + "th resultset is :"
104131
+ LinkisNodeExecutionImpl.getLinkisNodeExecution().getResult(this.job, i, LinkisJobExecutionConfiguration.RESULT_PRINT_SIZE.getValue(this.jobPropsMap)));
@@ -109,7 +136,7 @@ public void run() throws Exception {
109136

110137
@Override
111138
public void cancel() throws Exception {
112-
super.cancel();
139+
//super.cancel();
113140
LinkisNodeExecutionImpl.getLinkisNodeExecution().cancel(this.job);
114141
isCanceled = true;
115142
warn("This job has been canceled");
@@ -149,4 +176,37 @@ private void logJobProperties() {
149176
}
150177
}
151178

179+
private String getRunDate(){
180+
this.info("begin to get run date");
181+
if (this.jobProps != null &&
182+
this.jobProps.getBoolean(JOB_DUMP_PROPERTIES_IN_LOG, true)) {
183+
try {
184+
for (final Map.Entry<String, String> entry : this.jobPropsMap.entrySet()) {
185+
final String key = entry.getKey();
186+
final String value = key.endsWith(SENSITIVE_JOB_PROP_NAME_SUFFIX) ?
187+
SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER :
188+
entry.getValue();
189+
if ("azkaban.flow.start.timestamp".equals(key)){
190+
this.info("run time is " + value);
191+
String runDateNow = value.substring(0, 10).replaceAll("-", "");
192+
this.info("run date now is " + runDateNow);
193+
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
194+
try {
195+
Date date = simpleDateFormat.parse(runDateNow);
196+
//因为date已经当天的00:00:00 减掉12小时 就是昨天的时间
197+
String runDate = simpleDateFormat.format(new Date(date.getTime() - 24 * 60 * 60 * 1000));
198+
this.info("runDate is " + runDate);
199+
return runDate;
200+
} catch (ParseException e) {
201+
this.log.error("failed to parse run date " + runDateNow, e);
202+
}
203+
}
204+
}
205+
} catch (final Exception ex) {
206+
this.log.error("failed to log job properties ", ex);
207+
}
208+
}
209+
return null;
210+
}
211+
152212
}

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/conf/LinkisJobTypeConf.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/*
22
* Copyright 2019 WeBank
3-
*
43
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
4+
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
76
*
87
* http://www.apache.org/licenses/LICENSE-2.0
@@ -19,15 +18,15 @@
1918

2019
import com.webank.wedatasphere.linkis.common.conf.CommonVars;
2120

22-
/**
23-
* Created by peacewong on 2019/11/3.
24-
*/
21+
2522
public class LinkisJobTypeConf {
2623

2724
public static final String COMMAND = "command";
2825

2926
public static final String JOB_ID = "azkaban.job.id";
3027

28+
public static final String DSS_LABELS_KEY = "labels";
29+
3130
public static final String FLOW_NAME = "azkaban.flow.flowid";
3231

3332
public static final String PROJECT_ID = "azkaban.flow.projectid";
@@ -41,12 +40,8 @@ public class LinkisJobTypeConf {
4140

4241
public static final String FLOW_SUBMIT_USER = "azkaban.flow.submituser";
4342

44-
public static final String READ_NODE_TOKEN = "read.nodes";
45-
46-
public static final String SHARED_NODE_TOKEN = "share.num";
47-
4843
public static final String MSG_SAVE_KEY = "msg.savekey";
4944

50-
public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appjoint.eventchecker.eventreceiver");
45+
public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appconn.eventchecker.eventreceiver");
5146

5247
}

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/AzkabanAppJointLinkisJob.java renamed to plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/AzkabanAppConnLinkisJob.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/*
22
* Copyright 2019 WeBank
3-
*
43
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
4+
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
76
*
87
* http://www.apache.org/licenses/LICENSE-2.0
@@ -17,22 +16,15 @@
1716

1817
package com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.job;
1918

20-
import com.webank.wedatasphere.dss.linkis.node.execution.job.AbstractAppJointLinkisJob;
19+
import com.webank.wedatasphere.dss.linkis.node.execution.job.AbstractAppConnLinkisJob;
2120
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.conf.LinkisJobTypeConf;
22-
import org.apache.commons.lang.StringUtils;
2321

2422

25-
/**
26-
* Created by peacewong on 2019/11/3.
27-
*/
28-
public class AzkabanAppJointLinkisJob extends AbstractAppJointLinkisJob {
23+
public class AzkabanAppConnLinkisJob extends AbstractAppConnLinkisJob {
2924

3025

3126
@Override
3227
public String getSubmitUser() {
33-
if (StringUtils.isEmpty(getJobProps().get(LinkisJobTypeConf.FLOW_SUBMIT_USER))){
34-
return getJobProps().get(LinkisJobTypeConf.PROXY_USER);
35-
}
3628
return getJobProps().get(LinkisJobTypeConf.FLOW_SUBMIT_USER);
3729
}
3830

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/AzkabanAppJointLinkisReadJob.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

0 commit comments

Comments
 (0)