Skip to content

Commit 51b7fde

Browse files
committed
工作流节点支持Spark3
1 parent 27a9323 commit 51b7fde

File tree

1 file changed

+16
-15
lines changed
  • dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/service/impl

1 file changed

+16
-15
lines changed

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/service/impl/BuildJobActionImpl.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class BuildJobActionImpl implements BuildJobAction {
5050

5151
private Logger logger = LoggerFactory.getLogger(BuildJobActionImpl.class);
5252
private static BuildJobAction buildJobAction = new BuildJobActionImpl();
53-
private static final String NEBULA = "nebula";
53+
private static final String NEBULA = "nebula";
5454
private static final CommonVars<String> NEBULA_ENGINE_VERSION =
5555
CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0");
5656

@@ -140,41 +140,41 @@ public JobSubmitAction getSubmitAction(Job job) throws LinkisJobExecutionErrorEx
140140

141141
EngineTypeLabel engineTypeLabel = EngineTypeLabelCreator.createEngineTypeLabel(parseAppConnEngineType(job.getEngineType(), job));
142142
//TODO 升级linkis1.7.0版本之后,这段特殊的硬编码逻辑要去掉
143-
if(NEBULA.equalsIgnoreCase( engineTypeLabel.getEngineType())){
143+
if (NEBULA.equalsIgnoreCase(engineTypeLabel.getEngineType())) {
144144
engineTypeLabel.setVersion(NEBULA_ENGINE_VERSION.getValue());
145145
}
146146

147147
String stringValue = engineTypeLabel.getStringValue();
148148
// spark3开关开启,并且引擎是Spark
149-
if(WORKFLOW_SPARK3_SWITCH.getValue() && EngineType.SPARK().toString().equalsIgnoreCase(engineTypeLabel.getEngineType())){
149+
if (WORKFLOW_SPARK3_SWITCH.getValue() && EngineType.SPARK().toString().equalsIgnoreCase(engineTypeLabel.getEngineType())) {
150150

151-
Map<String,Object> variableMap = TaskUtils.getVariableMap(job.getParams());
151+
Map<String, Object> variableMap = TaskUtils.getVariableMap(job.getParams());
152152

153153
// 判断sparkVersion参数为3,则使用spark3的引擎版本,否则使用spark默认引擎版本
154-
if(MapUtils.isNotEmpty(variableMap)
155-
&& variableMap.get("sparkVersion")!=null
156-
&& StringUtils.startsWithIgnoreCase(variableMap.get("sparkVersion").toString().trim(),"3")){
154+
if (MapUtils.isNotEmpty(variableMap)
155+
&& variableMap.get("sparkVersion") != null
156+
&& StringUtils.equalsIgnoreCase(variableMap.get("sparkVersion").toString().trim(), "3")) {
157157

158-
EngineTypeLabel spark3EngineType= new EngineTypeLabel();
158+
EngineTypeLabel spark3EngineType = new EngineTypeLabel();
159159
spark3EngineType.setEngineType(engineTypeLabel.getEngineType());
160160
spark3EngineType.setVersion(SPARK3_ENGINE_VERSION.getValue());
161161
stringValue = spark3EngineType.getStringValue();
162162

163-
logger.info("{} job name ,spark3 engineType stringValue is {}",job.getJobName(),stringValue);
163+
logger.info("{} job name ,spark3 engineType stringValue is {}", job.getJobName(), stringValue);
164164

165165
}
166166

167167
}
168168

169-
logger.info("{} job name ,engineType stringValue is {}",job.getJobName(),stringValue);
169+
logger.info("{} job name ,engineType stringValue is {}", job.getJobName(), stringValue);
170170

171171
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, stringValue);
172172
labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, job.getUser() + "-" + LINKIS_JOB_CREATOR_1_X.getValue(job.getJobProps()));
173173
labels.put(LabelKeyConstant.CODE_TYPE_KEY, parseRunType(job.getEngineType(), job.getRunType(), job));
174174

175175

176176
//是否复用引擎,不复用就为空
177-
if(!isAppconnJob(job) && !isReuseEngine(job.getParams())){
177+
if (!isAppconnJob(job) && !isReuseEngine(job.getParams())) {
178178
labels.put("executeOnce", "");
179179
}
180180
Map<String, Object> paramMapCopy = (HashMap<String, Object>) SerializationUtils.clone(new HashMap<String, Object>(job.getParams()));
@@ -202,6 +202,7 @@ public JobSubmitAction getSubmitAction(Job job) throws LinkisJobExecutionErrorEx
202202

203203
/**
204204
* 是否复用引擎,复用返回:true,不复用:false
205+
*
205206
* @param params
206207
* @return
207208
*/
@@ -224,13 +225,13 @@ public boolean isReuseEngine(Map<String, Object> params) {
224225
/**
225226
* 是否为appconnjob
226227
*/
227-
private boolean isAppconnJob(Job job){
228+
private boolean isAppconnJob(Job job) {
228229
return APPCONN.equals(job.getEngineType());
229230
}
230231

231232
/**
232233
* spark自定义参数配置输入,例如spark.sql.shuffle.partitions=10。多个参数使用分号分隔。
233-
*
234+
* <p>
234235
* 如果节点指定了参数模板,则需要把节点内与模板相同的参数取消掉,保证模板优先级高于节点参数
235236
*
236237
* @param paramMapCopy
@@ -242,7 +243,7 @@ private void replaceSparkConfParams(Map<String, Object> paramMapCopy) throws Lin
242243
//如果节点指定了参数模板,则需要把节点内与模板相同的参数取消掉,保证模板优先级高于节点参数
243244
if (startupMap.containsKey("ec.conf.templateId")) {
244245
logger.info("remove keys in template");
245-
logger.info("before remove startup map:{}",startupMap.keySet());
246+
logger.info("before remove startup map:{}", startupMap.keySet());
246247
startupMap.remove("spark.driver.memory");
247248
startupMap.remove("spark.executor.memory");
248249
startupMap.remove("spark.executor.cores");
@@ -251,7 +252,7 @@ private void replaceSparkConfParams(Map<String, Object> paramMapCopy) throws Lin
251252
startupMap.remove("spark.conf");
252253
startupMap.remove("mapreduce.job.running.map.limit");
253254
startupMap.remove("mapreduce.job.running.reduce.limit");
254-
logger.info("after remove startup map:{}",startupMap.keySet());
255+
logger.info("after remove startup map:{}", startupMap.keySet());
255256
}
256257
Map<String, Object> configurationMap = TaskUtils.getMap(paramMapCopy, TaskConstant.PARAMS_CONFIGURATION);
257258
configurationMap.put(TaskConstant.PARAMS_CONFIGURATION_STARTUP, startupMap);

0 commit comments

Comments
 (0)