Skip to content

Commit 9f600c7

Browse files
Add support of extra flink configs.
1 parent 1b9aa8d commit 9f600c7

File tree

1 file changed

+15
-3
lines changed
  • streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl

1 file changed

+15
-3
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/ExtraConfigTransform.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,26 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl
1717

18-
import com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo.ConfigKeyVO
18+
import com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo.{ConfigKeyVO, ConfigRelationVO}
1919
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LaunchJob
20+
import org.springframework.beans.BeanUtils
2021

21-
import scala.collection.convert.WrapAsScala._
22+
import scala.collection.JavaConverters._
2223

2324

2425
class ExtraConfigTransform extends ResourceConfigTransform {
2526

26-
override protected def transform(config: ConfigKeyVO, job: LaunchJob): LaunchJob = transformConfig(config.getParameterConfig, job)
27+
override protected def transform(config: ConfigKeyVO, job: LaunchJob): LaunchJob = {
28+
val newConfigs = config.getParameterConfig.asScala.map{config =>
29+
val newConfig = new ConfigRelationVO
30+
BeanUtils.copyProperties(config, newConfig)
31+
newConfig.setKey(ExtraConfigTransform.FLINK_CONFIG_PREFIX + config.getKey)
32+
newConfig
33+
}.asJava
34+
transformConfig(newConfigs, job)
35+
}
2736

2837
}
38+
object ExtraConfigTransform {
39+
private val FLINK_CONFIG_PREFIX = "_FLINK_CONFIG_."
40+
}

0 commit comments

Comments
 (0)