Skip to content

Commit 8d906f4

Browse files
committed
变更sparkVersion参数从节点runtime中获取
1 parent 51b7fde commit 8d906f4

File tree

2 files changed

+31
-15
lines changed

2 files changed

+31
-15
lines changed

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/conf/LinkisJobExecutionConfiguration.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ public class LinkisJobExecutionConfiguration {
8989
public static final CommonVars<Boolean> LINKIS_DISCOVERY_ENABLE = CommonVars.apply("wds.dss.workflow.execution.linkis.discovery.enable", false);
9090
public static final CommonVars<String> EMBEDDED_FLOW_ID = CommonVars.apply("wds.dss.workflow.execution.subflow.flag","embeddedFlowId");
9191

92-
public static final CommonVars<Boolean> WORKFLOW_SPARK3_SWITCH = CommonVars.apply("wds.dss.workflow.spark3.switch", false);
93-
9492
public static final CommonVars<String> SPARK3_ENGINE_VERSION = CommonVars.apply("linkis.spark3.engine.version", "3.4.4");
9593

9694
public static boolean isLinkis1_X(Map<String, String> props) {

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/service/impl/BuildJobActionImpl.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang3.SerializationUtils;
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.linkis.common.conf.CommonVars;
29+
import org.apache.linkis.manager.label.conf.LabelCommonConfig;
2930
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
3031
import org.apache.linkis.manager.label.entity.engine.EngineType;
3132
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
@@ -145,23 +146,22 @@ public JobSubmitAction getSubmitAction(Job job) throws LinkisJobExecutionErrorEx
145146
}
146147

147148
String stringValue = engineTypeLabel.getStringValue();
148-
// spark3开关开启,并且引擎是Spark
149-
if (WORKFLOW_SPARK3_SWITCH.getValue() && EngineType.SPARK().toString().equalsIgnoreCase(engineTypeLabel.getEngineType())) {
150149

151-
Map<String, Object> variableMap = TaskUtils.getVariableMap(job.getParams());
152150

153-
// 判断sparkVersion参数为3,则使用spark3的引擎版本,否则使用spark默认引擎版本
154-
if (MapUtils.isNotEmpty(variableMap)
155-
&& variableMap.get("sparkVersion") != null
156-
&& StringUtils.equalsIgnoreCase(variableMap.get("sparkVersion").toString().trim(), "3")) {
157-
158-
EngineTypeLabel spark3EngineType = new EngineTypeLabel();
159-
spark3EngineType.setEngineType(engineTypeLabel.getEngineType());
160-
spark3EngineType.setVersion(SPARK3_ENGINE_VERSION.getValue());
161-
stringValue = spark3EngineType.getStringValue();
151+
//TODO 当默认引擎为spark3 可以去掉此段if代码
152+
if (EngineType.SPARK().toString().equalsIgnoreCase(engineTypeLabel.getEngineType())) {
162153

163-
logger.info("{} job name ,spark3 engineType stringValue is {}", job.getJobName(), stringValue);
154+
String sparkVersion = getSparkVersion(job.getParams());
164155

156+
// 判断sparkVersion参数为3,则使用spark3的引擎版本,否则使用spark默认引擎版本
157+
if (StringUtils.isNotEmpty(sparkVersion) &&
158+
StringUtils.equalsIgnoreCase(sparkVersion.trim(), "3")) {
159+
160+
EngineTypeLabel sparkEngineType = new EngineTypeLabel();
161+
sparkEngineType.setEngineType(engineTypeLabel.getEngineType());
162+
sparkEngineType.setVersion(SPARK3_ENGINE_VERSION.getValue());
163+
stringValue = sparkEngineType.getStringValue();
164+
logger.info("{} job name ,spark engineType stringValue is {}", job.getJobName(), stringValue);
165165
}
166166

167167
}
@@ -282,4 +282,22 @@ private String parseRunType(String engineType, String runType, Job job) {
282282
private void enrichParams(Job job) {
283283
job.getRuntimeParams().put("nodeType", job.getRunType());
284284
}
285+
286+
private String getSparkVersion(Map<String, Object> params) {
287+
288+
String sparkVersion = null;
289+
290+
if (params.get("configuration") != null) {
291+
Map<String, Object> configurationMap = (Map<String, Object>) params.get("configuration");
292+
if (configurationMap.get("runtime") != null) {
293+
Map<String, Object> runtimeMap = (Map<String, Object>) configurationMap.get("runtime");
294+
if (runtimeMap.get("sparkVersion") != null) {
295+
sparkVersion = (String) runtimeMap.get("sparkVersion");
296+
}
297+
}
298+
}
299+
300+
return sparkVersion;
301+
}
302+
285303
}

0 commit comments

Comments
 (0)