Skip to content

Commit 8c5ea02

Browse files
committed
opt stream env code
1 parent 86c9331 commit 8c5ea02

File tree

3 files changed

+205
-179
lines changed

3 files changed

+205
-179
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,6 @@ private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironme
312312

313313
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
314314
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
315-
316315
return tableEnv;
317316
}
318317
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

Lines changed: 205 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,25 @@
1919
package com.dtstack.flink.sql.environment;
2020

2121
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22-
import com.dtstack.flink.sql.util.FlinkUtil;
22+
import com.dtstack.flink.sql.enums.EStateBackend;
23+
import com.dtstack.flink.sql.util.MathUtil;
2324
import com.dtstack.flink.sql.util.PropertiesUtils;
25+
import org.apache.commons.lang3.BooleanUtils;
2426
import org.apache.commons.lang3.StringUtils;
2527
import org.apache.flink.api.common.ExecutionConfig;
2628
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2729
import org.apache.flink.api.common.time.Time;
30+
import org.apache.flink.api.java.tuple.Tuple2;
2831
import org.apache.flink.configuration.Configuration;
32+
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
33+
import org.apache.flink.runtime.state.StateBackend;
34+
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
35+
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
36+
import org.apache.flink.streaming.api.CheckpointingMode;
2937
import org.apache.flink.streaming.api.TimeCharacteristic;
38+
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3039
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
40+
import org.apache.flink.table.api.TableConfig;
3141
import org.apache.flink.table.api.TableEnvironment;
3242

3343
import java.io.IOException;
@@ -38,6 +48,8 @@
3848
import java.util.Optional;
3949
import java.util.Properties;
4050
import java.util.concurrent.TimeUnit;
51+
import java.util.regex.Matcher;
52+
import java.util.regex.Pattern;
4153

4254
/**
4355
*
@@ -103,14 +115,55 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
103115
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
104116
));
105117

106-
FlinkUtil.openCheckpoint(streamEnv, confProperties);
118+
// checkpoint config
119+
Optional<Boolean> checkpointingEnabled = isCheckpointingEnabled(confProperties);
120+
if (checkpointingEnabled.get()) {
121+
Optional<Long> checkpointInterval = getCheckpointInterval(confProperties);
122+
streamEnv.enableCheckpointing(checkpointInterval.get());
123+
124+
Optional<CheckpointingMode> checkpointingMode = getCheckpointingMode(confProperties);
125+
if (checkpointingMode.isPresent()) {
126+
streamEnv.getCheckpointConfig().setCheckpointingMode(checkpointingMode.get());
127+
}
128+
Optional<Long> checkpointTimeout = getCheckpointTimeout(confProperties);
129+
if (checkpointTimeout.isPresent()) {
130+
streamEnv.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout.get());
131+
}
132+
133+
Optional<Integer> maxConcurrentCheckpoints = getMaxConcurrentCheckpoints(confProperties);
134+
if (maxConcurrentCheckpoints.isPresent()) {
135+
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints.get());
136+
}
137+
138+
Optional<CheckpointConfig.ExternalizedCheckpointCleanup> checkpointCleanup = getCheckpointCleanup(confProperties);
139+
if (checkpointCleanup.isPresent()) {
140+
streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(checkpointCleanup.get());
141+
}
142+
143+
Optional<StateBackend> stateBackend = getStateBackend(confProperties);
144+
if (stateBackend.isPresent()) {
145+
streamEnv.setStateBackend(stateBackend.get());
146+
}
147+
}
107148
}
108149

150+
/**
151+
* 设置TableEnvironment状态超时时间
152+
* @param tableEnv
153+
* @param confProperties
154+
*/
109155
public static void streamTableEnvironmentStateTTLConfig(TableEnvironment tableEnv, Properties confProperties) {
110156
confProperties = PropertiesUtils.propertiesTrim(confProperties);
111-
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
157+
Optional<Tuple2<Time, Time>> tableEnvTTL = getTableEnvTTL(confProperties);
158+
if (tableEnvTTL.isPresent()) {
159+
Tuple2<Time, Time> timeRange = tableEnvTTL.get();
160+
TableConfig qConfig = tableEnv.getConfig();
161+
qConfig.setIdleStateRetentionTime(timeRange.f0, timeRange.f1);
162+
}
112163
}
113164

165+
166+
// -----------------------StreamExecutionEnvironment config-----------------------------------------------
114167
public static Optional<Integer> getEnvParallelism(Properties properties) {
115168
String parallelismStr = properties.getProperty(ConfigConstrant.SQL_ENV_PARALLELISM);
116169
return StringUtils.isNotBlank(parallelismStr) ? Optional.of(Integer.valueOf(parallelismStr)) : Optional.empty();
@@ -143,4 +196,153 @@ public static Optional<TimeCharacteristic> getStreamTimeCharacteristic(Propertie
143196
}
144197
return characteristic;
145198
}
199+
200+
public static Optional<Boolean> isCheckpointingEnabled(Properties properties) {
201+
boolean checkpointEnabled = properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY) == null
202+
&& properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null;
203+
return Optional.of(checkpointEnabled);
204+
}
205+
206+
public static Optional<Long> getCheckpointInterval(Properties properties) {
207+
// 两个参数主要用来做上层兼容
208+
Long sql_interval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY, "0"));
209+
Long flink_interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
210+
long checkpointInterval = Math.max(sql_interval, flink_interval);
211+
return Optional.of(checkpointInterval);
212+
}
213+
214+
public static Optional<CheckpointingMode> getCheckpointingMode(Properties properties) {
215+
String checkpointingModeStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_MODE_KEY);
216+
CheckpointingMode checkpointingMode = null;
217+
if (!StringUtils.isEmpty(checkpointingModeStr)) {
218+
checkpointingMode = CheckpointingMode.valueOf(checkpointingModeStr.toUpperCase());
219+
}
220+
return checkpointingMode == null ? Optional.empty() : Optional.of(checkpointingMode);
221+
}
222+
223+
public static Optional<Long> getCheckpointTimeout(Properties properties) {
224+
String checkpointTimeoutStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_TIMEOUT_KEY);
225+
226+
if (!StringUtils.isEmpty(checkpointTimeoutStr)) {
227+
Long checkpointTimeout = Long.valueOf(checkpointTimeoutStr);
228+
return Optional.of(checkpointTimeout);
229+
}
230+
return Optional.empty();
231+
}
232+
233+
public static Optional<Integer> getMaxConcurrentCheckpoints(Properties properties) {
234+
String maxConcurrCheckpointsStr = properties.getProperty(ConfigConstrant.FLINK_MAXCONCURRENTCHECKPOINTS_KEY);
235+
if (!StringUtils.isEmpty(maxConcurrCheckpointsStr)) {
236+
Integer maxConcurrCheckpoints = Integer.valueOf(maxConcurrCheckpointsStr);
237+
return Optional.of(maxConcurrCheckpoints);
238+
}
239+
return Optional.empty();
240+
}
241+
242+
public static Optional<CheckpointConfig.ExternalizedCheckpointCleanup> getCheckpointCleanup(Properties properties) {
243+
Boolean sqlCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_CLEANUPMODE_KEY), false);
244+
Boolean flinkCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY), false);
245+
246+
CheckpointConfig.ExternalizedCheckpointCleanup externalizedCheckpointCleanup = (sqlCleanMode || flinkCleanMode) ?
247+
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION : CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
248+
return Optional.of(externalizedCheckpointCleanup);
249+
}
250+
251+
public static Optional<StateBackend> getStateBackend(Properties properties) throws IOException {
252+
String backendType = properties.getProperty(ConfigConstrant.STATE_BACKEND_KEY);
253+
String checkpointDataUri = properties.getProperty(ConfigConstrant.CHECKPOINTS_DIRECTORY_KEY);
254+
String backendIncremental = properties.getProperty(ConfigConstrant.STATE_BACKEND_INCREMENTAL_KEY, "true");
255+
256+
if (!StringUtils.isEmpty(backendType)) {
257+
return createStateBackend(backendType, checkpointDataUri, backendIncremental);
258+
}
259+
return Optional.empty();
260+
}
261+
262+
private static Optional<StateBackend> createStateBackend(String backendType, String checkpointDataUri, String backendIncremental) throws IOException {
263+
EStateBackend stateBackendType = EStateBackend.convertFromString(backendType);
264+
StateBackend stateBackend = null;
265+
switch (stateBackendType) {
266+
case MEMORY:
267+
stateBackend = new MemoryStateBackend();
268+
break;
269+
case FILESYSTEM:
270+
checkpointDataUriEmptyCheck(checkpointDataUri, backendType);
271+
stateBackend = new FsStateBackend(checkpointDataUri);
272+
break;
273+
case ROCKSDB:
274+
checkpointDataUriEmptyCheck(checkpointDataUri, backendType);
275+
stateBackend = new RocksDBStateBackend(checkpointDataUri, BooleanUtils.toBoolean(backendIncremental));
276+
break;
277+
}
278+
return stateBackend == null ? Optional.empty() : Optional.of(stateBackend);
279+
}
280+
281+
private static void checkpointDataUriEmptyCheck(String checkpointDataUri, String backendType) {
282+
if (StringUtils.isEmpty(checkpointDataUri)) {
283+
throw new RuntimeException(backendType + " backend checkpointDataUri not null!");
284+
}
285+
}
286+
287+
// -----------------TableEnvironment state ttl config------------------------------
288+
289+
private static final String TTL_PATTERN_STR = "^+?([1-9][0-9]*)([dDhHmMsS])$";
290+
private static final Pattern TTL_PATTERN = Pattern.compile(TTL_PATTERN_STR);
291+
292+
public static Optional<Tuple2<Time, Time>> getTableEnvTTL(Properties properties) {
293+
String ttlMintimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MINTIME);
294+
String ttlMaxtimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MAXTIME);
295+
if (StringUtils.isNotEmpty(ttlMintimeStr) || StringUtils.isNotEmpty(ttlMaxtimeStr)) {
296+
verityTtl(ttlMintimeStr, ttlMaxtimeStr);
297+
Matcher ttlMintimeStrMatcher = TTL_PATTERN.matcher(ttlMintimeStr);
298+
Matcher ttlMaxtimeStrMatcher = TTL_PATTERN.matcher(ttlMaxtimeStr);
299+
300+
Long ttlMintime = 0L;
301+
Long ttlMaxtime = 0L;
302+
if (ttlMintimeStrMatcher.find()) {
303+
ttlMintime = getTtlTime(Integer.parseInt(ttlMintimeStrMatcher.group(1)), ttlMintimeStrMatcher.group(2));
304+
}
305+
if (ttlMaxtimeStrMatcher.find()) {
306+
ttlMaxtime = getTtlTime(Integer.parseInt(ttlMaxtimeStrMatcher.group(1)), ttlMaxtimeStrMatcher.group(2));
307+
}
308+
if (0L != ttlMintime && 0L != ttlMaxtime) {
309+
return Optional.of(new Tuple2<>(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime)));
310+
}
311+
}
312+
return Optional.empty();
313+
}
314+
315+
/**
316+
* ttl 校验
317+
* @param ttlMintimeStr 最小时间
318+
* @param ttlMaxtimeStr 最大时间
319+
*/
320+
private static void verityTtl(String ttlMintimeStr, String ttlMaxtimeStr) {
321+
if (null == ttlMintimeStr
322+
|| null == ttlMaxtimeStr
323+
|| !TTL_PATTERN.matcher(ttlMintimeStr).find()
324+
|| !TTL_PATTERN.matcher(ttlMaxtimeStr).find()) {
325+
throw new RuntimeException("sql.ttl.min 、sql.ttl.max must be set at the same time . example sql.ttl.min=1h,sql.ttl.max=2h");
326+
}
327+
}
328+
329+
/**
330+
* 不同单位时间到毫秒的转换
331+
* @param timeNumber 时间值,如:30
332+
* @param timeUnit 单位,d:天,h:小时,m:分,s:秒
333+
* @return
334+
*/
335+
private static Long getTtlTime(Integer timeNumber, String timeUnit) {
336+
if (timeUnit.equalsIgnoreCase("d")) {
337+
return timeNumber * 1000l * 60 * 60 * 24;
338+
} else if (timeUnit.equalsIgnoreCase("h")) {
339+
return timeNumber * 1000l * 60 * 60;
340+
} else if (timeUnit.equalsIgnoreCase("m")) {
341+
return timeNumber * 1000l * 60;
342+
} else if (timeUnit.equalsIgnoreCase("s")) {
343+
return timeNumber * 1000l;
344+
} else {
345+
throw new RuntimeException("not support " + timeNumber + timeUnit);
346+
}
347+
}
146348
}

0 commit comments

Comments
 (0)