Skip to content

Commit e0d3a53

Browse files
committed
Make the constraints in transforms be configurable.
1 parent 7b59b19 commit e0d3a53

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkCheckpointConfigTransform.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.job.LaunchJob
2323
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager
2424
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.state.Checkpoint
2525
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl.FlinkCheckpointConfigTransform.CHECKPOINT_PATH_CONFIG_NAME
26+
import org.apache.linkis.common.conf.CommonVars
2627
import org.apache.linkis.common.utils.Logging
2728

2829
import scala.collection.JavaConverters._
@@ -62,5 +63,5 @@ class FlinkCheckpointConfigTransform extends FlinkConfigTransform with Logging{
6263
}
6364

6465
object FlinkCheckpointConfigTransform{
65-
val CHECKPOINT_PATH_CONFIG_NAME = "state.checkpoints.dir"
66+
private val CHECKPOINT_PATH_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.checkpoint-path", "state.checkpoints.dir").getValue
6667
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/transform/impl/FlinkInternalConfigTransform.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,18 @@ class FlinkInternalConfigTransform extends FlinkConfigTransform {
3232
}
3333

3434
object FlinkInternalConfigTransform {
35+
/**
36+
* Defined in FlinkStreamisConfigDefine.LOG_GATEWAY_ADDRESS of 'flink-streamis-log-collector'
37+
*/
38+
private val LOG_GATEWAY_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-gateway", "stream.log.gateway.address").getValue
3539

36-
private val FLINK_LOG_GATEWAY_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-gateway", "stream.log.gateway.address").getValue
40+
/**
41+
* Defined in FlinkStreamisConfigDefine.LOG_GATEWAY_ADDRESS of 'flink-streamis-log-collector'
42+
*/
43+
private val LOG_COLLECT_PATH_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-collect-path", "stream.log.collect.path").getValue
3744

38-
private val FLINK_LOG_COLLECT_PATH_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-collect-path", "stream.log.collect.path").getValue
3945

40-
val INTERNAL_CONFIG_MAP = Map(JobConf.STREAMIS_JOB_LOG_GATEWAY.key -> FLINK_LOG_GATEWAY_CONFIG_NAME,
41-
JobConf.STREAMIS_JOB_LOG_COLLECT_PATH.key -> FLINK_LOG_COLLECT_PATH_CONFIG_NAME
46+
val INTERNAL_CONFIG_MAP = Map(JobConf.STREAMIS_JOB_LOG_GATEWAY.key -> LOG_GATEWAY_CONFIG_NAME,
47+
JobConf.STREAMIS_JOB_LOG_COLLECT_PATH.key -> LOG_COLLECT_PATH_CONFIG_NAME
4248
)
4349
}

0 commit comments

Comments
 (0)