Skip to content

Commit 86c9331

Browse files
committed
extract streamEnvConfigManager
1 parent 1aa2276 commit 86c9331

File tree

3 files changed

+157
-95
lines changed

3 files changed

+157
-95
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2929
//import com.dtstack.flink.sql.exec.FlinkSQLExec;
3030
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
31+
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
3132
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3233
import com.dtstack.flink.sql.option.OptionParser;
3334
import com.dtstack.flink.sql.parser.CreateFuncParser;
@@ -129,7 +130,7 @@ public static void main(String[] args) throws Exception {
129130
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
130131
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
131132
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
132-
StreamTableEnvironment tableEnv = getStreamTableEnv(confProperties, env);
133+
StreamTableEnvironment tableEnv = getStreamTableEnv(env, confProperties);
133134

134135
List<URL> jarURList = Lists.newArrayList();
135136
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -294,59 +295,24 @@ private static URL buildSidePathByLoadMode(String type, String operator, String
294295
}
295296

296297
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
297-
confProperties = PropertiesUtils.propertiesTrim(confProperties);
298-
299298
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
300299
StreamExecutionEnvironment.getExecutionEnvironment() :
301300
new MyLocalStreamEnvironment();
302-
env.getConfig().disableClosureCleaner();
303-
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
304-
305-
Configuration globalJobParameters = new Configuration();
306-
//Configuration unsupported set properties key-value
307-
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
308-
method.setAccessible(true);
309-
for (Map.Entry<Object, Object> prop : confProperties.entrySet()) {
310-
method.invoke(globalJobParameters, prop.getKey(), prop.getValue());
311-
}
312301

313-
ExecutionConfig exeConfig = env.getConfig();
314-
if(exeConfig.getGlobalJobParameters() == null){
315-
exeConfig.setGlobalJobParameters(globalJobParameters);
316-
}else if(exeConfig.getGlobalJobParameters() instanceof Configuration){
317-
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
318-
}
319-
if(FlinkUtil.getMaxEnvParallelism(confProperties) > 0){
320-
env.setMaxParallelism(FlinkUtil.getMaxEnvParallelism(confProperties));
321-
}
322-
if(FlinkUtil.getBufferTimeoutMillis(confProperties) > 0){
323-
env.setBufferTimeout(FlinkUtil.getBufferTimeoutMillis(confProperties));
324-
}
325-
env.setRestartStrategy(RestartStrategies.failureRateRestart(
326-
ConfigConstrant.failureRate,
327-
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
328-
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
329-
));
330-
FlinkUtil.setStreamTimeCharacteristic(env, confProperties);
331-
FlinkUtil.openCheckpoint(env, confProperties);
302+
StreamEnvConfigManager.streamExecutionEnvironmentConfig(env, confProperties);
332303
return env;
333304
}
334305

335-
/**
336-
* 获取StreamTableEnvironment并设置相关属性
337-
*
338-
* @param confProperties
339-
* @return
340-
*/
341-
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
306+
private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
307+
// use blink and streammode
342308
EnvironmentSettings settings = EnvironmentSettings.newInstance()
343309
.useBlinkPlanner()
344310
.inStreamingMode()
345311
.build();
346-
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
347312

348-
confProperties = PropertiesUtils.propertiesTrim(confProperties);
349-
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
313+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
314+
StreamEnvConfigManager.streamTableEnvironmentStateTTLConfig(tableEnv, confProperties);
315+
350316
return tableEnv;
351317
}
352318
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.environment;
20+
21+
import com.dtstack.flink.sql.constrant.ConfigConstrant;
22+
import com.dtstack.flink.sql.util.FlinkUtil;
23+
import com.dtstack.flink.sql.util.PropertiesUtils;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.ExecutionConfig;
26+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
27+
import org.apache.flink.api.common.time.Time;
28+
import org.apache.flink.configuration.Configuration;
29+
import org.apache.flink.streaming.api.TimeCharacteristic;
30+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.table.api.TableEnvironment;
32+
33+
import java.io.IOException;
34+
import java.lang.reflect.InvocationTargetException;
35+
import java.lang.reflect.Method;
36+
import java.util.Arrays;
37+
import java.util.Map;
38+
import java.util.Optional;
39+
import java.util.Properties;
40+
import java.util.concurrent.TimeUnit;
41+
42+
/**
43+
*
44+
* 流执行环境相关配置
45+
* Date: 2019/11/22
46+
* Company: www.dtstack.com
47+
* @author maqi
48+
*/
49+
public final class StreamEnvConfigManager {
50+
private StreamEnvConfigManager() {
51+
throw new AssertionError("Singleton class.");
52+
}
53+
54+
/**
55+
* 配置StreamExecutionEnvironment运行时参数
56+
* @param streamEnv
57+
* @param confProperties
58+
*/
59+
public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment streamEnv, Properties confProperties)
60+
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException {
61+
62+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
63+
streamEnv.getConfig().disableClosureCleaner();
64+
65+
Configuration globalJobParameters = new Configuration();
66+
//Configuration unsupported set properties key-value
67+
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
68+
method.setAccessible(true);
69+
for (Map.Entry<Object, Object> prop : confProperties.entrySet()) {
70+
method.invoke(globalJobParameters, prop.getKey(), prop.getValue());
71+
}
72+
73+
ExecutionConfig exeConfig = streamEnv.getConfig();
74+
if (exeConfig.getGlobalJobParameters() == null) {
75+
exeConfig.setGlobalJobParameters(globalJobParameters);
76+
} else if (exeConfig.getGlobalJobParameters() instanceof Configuration) {
77+
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
78+
}
79+
80+
Optional<Integer> envParallelism = getEnvParallelism(confProperties);
81+
if (envParallelism.isPresent()) {
82+
streamEnv.setParallelism(envParallelism.get());
83+
}
84+
85+
Optional<Integer> maxParallelism = getMaxEnvParallelism(confProperties);
86+
if (maxParallelism.isPresent()) {
87+
streamEnv.setMaxParallelism(maxParallelism.get());
88+
}
89+
90+
Optional<Long> bufferTimeoutMillis = getBufferTimeoutMillis(confProperties);
91+
if (bufferTimeoutMillis.isPresent()) {
92+
streamEnv.setBufferTimeout(bufferTimeoutMillis.get());
93+
}
94+
95+
Optional<TimeCharacteristic> streamTimeCharacteristic = getStreamTimeCharacteristic(confProperties);
96+
if (streamTimeCharacteristic.isPresent()) {
97+
streamEnv.setStreamTimeCharacteristic(streamTimeCharacteristic.get());
98+
}
99+
100+
streamEnv.setRestartStrategy(RestartStrategies.failureRateRestart(
101+
ConfigConstrant.failureRate,
102+
Time.of(ConfigConstrant.failureInterval, TimeUnit.MINUTES),
103+
Time.of(ConfigConstrant.delayInterval, TimeUnit.SECONDS)
104+
));
105+
106+
FlinkUtil.openCheckpoint(streamEnv, confProperties);
107+
}
108+
109+
public static void streamTableEnvironmentStateTTLConfig(TableEnvironment tableEnv, Properties confProperties) {
110+
confProperties = PropertiesUtils.propertiesTrim(confProperties);
111+
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
112+
}
113+
114+
public static Optional<Integer> getEnvParallelism(Properties properties) {
115+
String parallelismStr = properties.getProperty(ConfigConstrant.SQL_ENV_PARALLELISM);
116+
return StringUtils.isNotBlank(parallelismStr) ? Optional.of(Integer.valueOf(parallelismStr)) : Optional.empty();
117+
}
118+
119+
public static Optional<Integer> getMaxEnvParallelism(Properties properties) {
120+
String parallelismStr = properties.getProperty(ConfigConstrant.SQL_MAX_ENV_PARALLELISM);
121+
return StringUtils.isNotBlank(parallelismStr) ? Optional.of(Integer.valueOf(parallelismStr)) : Optional.empty();
122+
}
123+
124+
public static Optional<Long> getBufferTimeoutMillis(Properties properties) {
125+
String mills = properties.getProperty(ConfigConstrant.SQL_BUFFER_TIMEOUT_MILLIS);
126+
return StringUtils.isNotBlank(mills) ? Optional.of(Long.valueOf(mills)) : Optional.empty();
127+
}
128+
129+
/**
130+
* #ProcessingTime(默认), IngestionTime, EventTime
131+
* @param properties
132+
*/
133+
public static Optional<TimeCharacteristic> getStreamTimeCharacteristic(Properties properties) {
134+
if (!properties.containsKey(ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY)) {
135+
return Optional.empty();
136+
}
137+
String characteristicStr = properties.getProperty(ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
138+
Optional<TimeCharacteristic> characteristic = Arrays.stream(TimeCharacteristic.values())
139+
.filter(tc -> !characteristicStr.equalsIgnoreCase(tc.toString())).findAny();
140+
141+
if (!characteristic.isPresent()) {
142+
throw new RuntimeException("illegal property :" + ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
143+
}
144+
return characteristic;
145+
}
146+
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -148,31 +148,6 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
148148

149149
}
150150

151-
/**
152-
* #ProcessingTime(默认),IngestionTime,EventTime
153-
* @param env
154-
* @param properties
155-
*/
156-
public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, Properties properties){
157-
if(!properties.containsKey(ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY)){
158-
//走默认值
159-
return;
160-
}
161-
162-
String characteristicStr = properties.getProperty(ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
163-
Boolean flag = false;
164-
for(TimeCharacteristic tmp : TimeCharacteristic.values()){
165-
if(characteristicStr.equalsIgnoreCase(tmp.toString())){
166-
env.setStreamTimeCharacteristic(tmp);
167-
flag = true;
168-
break;
169-
}
170-
}
171-
172-
if(!flag){
173-
throw new RuntimeException("illegal property :" + ConfigConstrant.FLINK_TIME_CHARACTERISTIC_KEY);
174-
}
175-
}
176151

177152

178153
/**
@@ -262,16 +237,7 @@ public static void registerAggregateUDF(String classPath, String funcName, Table
262237
}
263238
}
264239

265-
/**
266-
*
267-
* FIXME 仅针对sql执行方式,暂时未找到区分设置source,transform,sink 并行度的方式
268-
* 设置job运行的并行度
269-
* @param properties
270-
*/
271-
public static int getEnvParallelism(Properties properties){
272-
String parallelismStr = properties.getProperty(ConfigConstrant.SQL_ENV_PARALLELISM);
273-
return StringUtils.isNotBlank(parallelismStr)?Integer.parseInt(parallelismStr):1;
274-
}
240+
275241

276242
/**
277243
* 设置ttl
@@ -336,25 +302,9 @@ private static Long getTtlTime(Integer timeNumber,String timeUnit) {
336302
}
337303
}
338304

339-
/**
340-
* 最大并发度
341-
* @param properties
342-
* @return
343-
*/
344-
public static int getMaxEnvParallelism(Properties properties){
345-
String parallelismStr = properties.getProperty(ConfigConstrant.SQL_MAX_ENV_PARALLELISM);
346-
return StringUtils.isNotBlank(parallelismStr)?Integer.parseInt(parallelismStr):0;
347-
}
348305

349-
/**
350-
*
351-
* @param properties
352-
* @return
353-
*/
354-
public static long getBufferTimeoutMillis(Properties properties){
355-
String mills = properties.getProperty(ConfigConstrant.SQL_BUFFER_TIMEOUT_MILLIS);
356-
return StringUtils.isNotBlank(mills)?Long.parseLong(mills):0L;
357-
}
306+
307+
358308

359309
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
360310
for(URL url : jarURLList){

0 commit comments

Comments
 (0)