Skip to content

Commit ce02036

Browse files
committed
merge 1.8
1 parent 6d32bdf commit ce02036

File tree

2 files changed

+25
-51
lines changed

2 files changed

+25
-51
lines changed

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -91,25 +91,15 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
9191
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
9292
}
9393

94-
Optional<Integer> envParallelism = getEnvParallelism(confProperties);
95-
if (envParallelism.isPresent()) {
96-
streamEnv.setParallelism(envParallelism.get());
97-
}
98-
99-
Optional<Integer> maxParallelism = getMaxEnvParallelism(confProperties);
100-
if (maxParallelism.isPresent()) {
101-
streamEnv.setMaxParallelism(maxParallelism.get());
102-
}
103-
104-
Optional<Long> bufferTimeoutMillis = getBufferTimeoutMillis(confProperties);
105-
if (bufferTimeoutMillis.isPresent()) {
106-
streamEnv.setBufferTimeout(bufferTimeoutMillis.get());
107-
}
108-
109-
Optional<TimeCharacteristic> streamTimeCharacteristic = getStreamTimeCharacteristic(confProperties);
110-
if (streamTimeCharacteristic.isPresent()) {
111-
streamEnv.setStreamTimeCharacteristic(streamTimeCharacteristic.get());
112-
}
94+
getEnvParallelism(confProperties).ifPresent(streamEnv::setParallelism);
95+
getMaxEnvParallelism(confProperties).ifPresent(streamEnv::setMaxParallelism);
96+
getBufferTimeoutMillis(confProperties).ifPresent(streamEnv::setBufferTimeout);
97+
getStreamTimeCharacteristic(confProperties).ifPresent(streamEnv::setStreamTimeCharacteristic);
98+
getAutoWatermarkInterval(confProperties).ifPresent(op -> {
99+
if (streamEnv.getStreamTimeCharacteristic().equals(TimeCharacteristic.EventTime)) {
100+
streamEnv.getConfig().setAutoWatermarkInterval(op);
101+
}
102+
});
113103

114104
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
115105
ConfigConstrant.failureRate,
@@ -120,32 +110,12 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
120110
// checkpoint config
121111
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
122112
if (checkpointingEnabled.get()) {
123-
Optional<Long> checkpointInterval = getCheckpointInterval(confProperties);
124-
streamEnv.enableCheckpointing(checkpointInterval.get());
125-
126-
Optional<CheckpointingMode> checkpointingMode = getCheckpointingMode(confProperties);
127-
if (checkpointingMode.isPresent()) {
128-
streamEnv.getCheckpointConfig().setCheckpointingMode(checkpointingMode.get());
129-
}
130-
Optional<Long> checkpointTimeout = getCheckpointTimeout(confProperties);
131-
if (checkpointTimeout.isPresent()) {
132-
streamEnv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout.get());
133-
}
134-
135-
Optional<Integer> maxConcurrentCheckpoints = getMaxConcurrentCheckpoints(confProperties);
136-
if (maxConcurrentCheckpoints.isPresent()) {
137-
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints.get());
138-
}
139-
140-
Optional<CheckpointConfig.ExternalizedCheckpointCleanup> checkpointCleanup = getCheckpointCleanup(confProperties);
141-
if (checkpointCleanup.isPresent()) {
142-
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(checkpointCleanup.get());
143-
}
144-
145-
Optional<StateBackend> stateBackend = getStateBackend(confProperties);
146-
if (stateBackend.isPresent()) {
147-
streamEnv.setStateBackend(stateBackend.get());
148-
}
113+
getCheckpointInterval(confProperties).ifPresent(streamEnv::enableCheckpointing);
114+
getCheckpointingMode(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setCheckpointingMode);
115+
getCheckpointTimeout(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setCheckpointTimeout);
116+
getMaxConcurrentCheckpoints(confProperties).ifPresent(streamEnv.getCheckpointConfig()::setMaxConcurrentCheckpoints);
117+
getCheckpointCleanup(confProperties).ifPresent(streamEnv.getCheckpointConfig()::enableExternalizedCheckpoints);
118+
getStateBackend(confProperties).ifPresent(streamEnv::setStateBackend);
149119
}
150120
}
151121

@@ -181,6 +151,11 @@ public static Optional<Long> getBufferTimeoutMillis(Properties properties) {
181151
return StringUtils.isNotBlank(mills) ? Optional.of(Long.valueOf(mills)) : Optional.empty();
182152
}
183153

154+
public static Optional<Long> getAutoWatermarkInterval(Properties properties) {
155+
String autoWatermarkInterval = properties.getProperty(ConfigConstrant.AUTO_WATERMARK_INTERVAL_KEY);
156+
return StringUtils.isNotBlank(autoWatermarkInterval) ? Optional.of(Long.valueOf(autoWatermarkInterval)) : Optional.empty();
157+
}
158+
184159
/**
185160
* #ProcessingTime(默认), IngestionTime, EventTime
186161
* @param properties
@@ -191,7 +166,7 @@ public static Optional<TimeCharacteristic> getStreamTimeCharacteristic(Propertie
191166
}
192167
String characteristicStr = properties.getProperty(ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
193168
Optional<TimeCharacteristic> characteristic = Arrays.stream(TimeCharacteristic.values())
194-
.filter(tc -> !characteristicStr.equalsIgnoreCase(tc.toString())).findAny();
169+
.filter(tc -> characteristicStr.equalsIgnoreCase(tc.toString())).findAny();
195170

196171
if (!characteristic.isPresent()) {
197172
throw new RuntimeException("illegal property :" + ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
@@ -207,9 +182,9 @@ public static Optional<Boolean> isCheckpointingEnabled(Properties properties) {
207182

208183
public static Optional<Long> getCheckpointInterval(Properties properties) {
209184
// 两个参数主要用来做上层兼容
210-
Long sql_interval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY, "0"));
211-
Long flink_interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
212-
long checkpointInterval = Math.max(sql_interval, flink_interval);
185+
Long sqlInterval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY, "0"));
186+
Long flinkInterval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
187+
long checkpointInterval = Math.max(sqlInterval, flinkInterval);
213188
return Optional.of(checkpointInterval);
214189
}
215190

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,9 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
6262
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
6363

6464
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
65-
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), flinkConfig, confProperties);
65+
perJobClusterClientBuilder.init(launcherOptions.getYarnconf(), confProperties);
6666

6767
String flinkJarPath = launcherOptions.getFlinkJarPath();
68-
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(flinkJarPath, launcherOptions, jobGraph);
6968

7069
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph, flinkConfig);
7170
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);

0 commit comments

Comments
 (0)