Skip to content

Commit 27a9323

Browse files
committed
工作流节点支持Spark3
1 parent f5d714a commit 27a9323

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/conf/LinkisJobExecutionConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public class LinkisJobExecutionConfiguration {
8888

8989
public static final CommonVars<Boolean> LINKIS_DISCOVERY_ENABLE = CommonVars.apply("wds.dss.workflow.execution.linkis.discovery.enable", false);
9090
public static final CommonVars<String> EMBEDDED_FLOW_ID = CommonVars.apply("wds.dss.workflow.execution.subflow.flag","embeddedFlowId");
91+
92+
public static final CommonVars<Boolean> WORKFLOW_SPARK3_SWITCH = CommonVars.apply("wds.dss.workflow.spark3.switch", false);
93+
94+
public static final CommonVars<String> SPARK3_ENGINE_VERSION = CommonVars.apply("linkis.spark3.engine.version", "3.4.4");
95+
9196
public static boolean isLinkis1_X(Map<String, String> props) {
9297
return props.getOrDefault(LinkisJobExecutionConfiguration.LINKIS_VERSION_KEY,"")
9398
.startsWith("1.");

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/service/impl/BuildJobActionImpl.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
2323
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2424
import com.webank.wedatasphere.dss.linkis.node.execution.service.BuildJobAction;
25+
import org.apache.commons.collections.MapUtils;
2526
import org.apache.commons.lang3.SerializationUtils;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.linkis.common.conf.CommonVars;
2829
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
30+
import org.apache.linkis.manager.label.entity.engine.EngineType;
2931
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
3032
import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator;
3133
import org.apache.linkis.protocol.constants.TaskConstant;
@@ -142,7 +144,31 @@ public JobSubmitAction getSubmitAction(Job job) throws LinkisJobExecutionErrorEx
142144
engineTypeLabel.setVersion(NEBULA_ENGINE_VERSION.getValue());
143145
}
144146

145-
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, engineTypeLabel.getStringValue());
147+
String stringValue = engineTypeLabel.getStringValue();
148+
// spark3开关开启,并且引擎是Spark
149+
if(WORKFLOW_SPARK3_SWITCH.getValue() && EngineType.SPARK().toString().equalsIgnoreCase(engineTypeLabel.getEngineType())){
150+
151+
Map<String,Object> variableMap = TaskUtils.getVariableMap(job.getParams());
152+
153+
// 判断sparkVersion参数为3,则使用spark3的引擎版本,否则使用spark默认引擎版本
154+
if(MapUtils.isNotEmpty(variableMap)
155+
&& variableMap.get("sparkVersion")!=null
156+
&& StringUtils.startsWithIgnoreCase(variableMap.get("sparkVersion").toString().trim(),"3")){
157+
158+
EngineTypeLabel spark3EngineType= new EngineTypeLabel();
159+
spark3EngineType.setEngineType(engineTypeLabel.getEngineType());
160+
spark3EngineType.setVersion(SPARK3_ENGINE_VERSION.getValue());
161+
stringValue = spark3EngineType.getStringValue();
162+
163+
logger.info("{} job name ,spark3 engineType stringValue is {}",job.getJobName(),stringValue);
164+
165+
}
166+
167+
}
168+
169+
logger.info("{} job name ,engineType stringValue is {}",job.getJobName(),stringValue);
170+
171+
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, stringValue);
146172
labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, job.getUser() + "-" + LINKIS_JOB_CREATOR_1_X.getValue(job.getJobProps()));
147173
labels.put(LabelKeyConstant.CODE_TYPE_KEY, parseRunType(job.getEngineType(), job.getRunType(), job));
148174

0 commit comments

Comments
 (0)