Skip to content

Commit 16893f9

Browse files
committed
ttl设置无效修改
1 parent 905a779 commit 16893f9

File tree

4 files changed

+20
-21
lines changed

4 files changed

+20
-21
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public static void main(String[] args) throws Exception {
127127
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
128128
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
129129
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
130-
StreamTableEnvironment tableEnv = getStreamTableEnv(confProperties, env);
130+
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
131+
StreamQueryConfig queryConfig = getStreamTableEnvTTL(confProperties, tableEnv);
131132

132133
List<URL> jarURList = Lists.newArrayList();
133134
SqlTree sqlTree = SqlParser.parseSql(sql);
@@ -146,7 +147,7 @@ public static void main(String[] args) throws Exception {
146147
//register table schema
147148
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
148149

149-
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
150+
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache, queryConfig);
150151

151152
if(env instanceof MyLocalStreamEnvironment) {
152153
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -155,7 +156,7 @@ public static void main(String[] args) throws Exception {
155156
env.execute(name);
156157
}
157158

158-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
159+
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
159160
SideSqlExec sideSqlExec = new SideSqlExec();
160161
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
161162
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
@@ -185,9 +186,9 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
185186
}
186187
if(isSide){
187188
//sql-dimensional table contains the dimension table of execution
188-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
189+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
189190
}else{
190-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
191+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
191192
if(LOG.isInfoEnabled()){
192193
LOG.info("exec sql: " + result.getExecSql());
193194
}
@@ -353,13 +354,11 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
353354
* 获取StreamTableEnvironment并设置相关属性
354355
*
355356
* @param confProperties
356-
* @param env
357+
* @param tableEnv
357358
* @return
358359
*/
359-
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
360+
private static StreamQueryConfig getStreamTableEnvTTL(Properties confProperties, StreamTableEnvironment tableEnv) {
360361
confProperties = PropertiesUtils.propertiesTrim(confProperties);
361-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
362-
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
363-
return tableEnv;
362+
return FlinkUtil.getTableEnvTTL(confProperties, tableEnv);
364363
}
365364
}

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import org.apache.calcite.sql.SqlIdentifier;
2222
import org.apache.calcite.sql.SqlInsert;
2323
import org.apache.calcite.sql.SqlNode;
24-
import org.apache.flink.table.api.Table;
25-
import org.apache.flink.table.api.TableEnvironment;
26-
import org.apache.flink.table.api.TableException;
27-
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.api.*;
2825
import org.apache.flink.table.api.java.StreamTableEnvironment;
2926
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3027
import org.apache.flink.table.plan.logical.LogicalRelNode;
@@ -41,7 +38,7 @@
4138
*/
4239
public class FlinkSQLExec {
4340

44-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
41+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt, StreamQueryConfig queryConfig) throws Exception {
4542

4643
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
4744
SqlNode insert = planner.parse(stmt);
@@ -78,7 +75,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
7875
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
7976
"TableSink schema: " + String.join(",", fieldNames));
8077
}
81-
82-
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
78+
StreamQueryConfig config = null == queryConfig ? tableEnv.queryConfig() : queryConfig;
79+
tableEnv.insertInto(newTable, targetTableName, config);
8380
}
8481
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.google.common.collect.Lists;
5353
import com.google.common.collect.Maps;
5454
import org.apache.flink.streaming.api.datastream.DataStream;
55+
import org.apache.flink.table.api.StreamQueryConfig;
5556
import org.apache.flink.table.api.Table;
5657
import org.apache.flink.table.api.java.StreamTableEnvironment;
5758
import org.apache.flink.types.Row;
@@ -82,7 +83,7 @@ public class SideSqlExec {
8283
private Map<String, Table> localTableCache = Maps.newHashMap();
8384

8485
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
85-
Map<String, Table> tableCache)
86+
Map<String, Table> tableCache, StreamQueryConfig queryConfig)
8687
throws Exception {
8788

8889
if(localSqlPluginPath == null){
@@ -115,7 +116,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
115116
if(pollSqlNode.getKind() == INSERT){
116117
System.out.println("----------real exec sql-----------" );
117118
System.out.println(pollSqlNode.toString());
118-
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
119+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString(), queryConfig);
119120
if(LOG.isInfoEnabled()){
120121
LOG.info("exec sql: " + pollSqlNode.toString());
121122
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ public static int getEnvParallelism(Properties properties){
278278
* @param tableEnv
279279
* @return
280280
*/
281-
public static void setTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) {
281+
public static StreamQueryConfig getTableEnvTTL(Properties properties, StreamTableEnvironment tableEnv) {
282+
StreamQueryConfig qConfig = null;
282283
String ttlMintimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MINTIME);
283284
String ttlMaxtimeStr = properties.getProperty(ConfigConstrant.SQL_TTL_MAXTIME);
284285
if (StringUtils.isNotEmpty(ttlMintimeStr) || StringUtils.isNotEmpty(ttlMaxtimeStr)) {
@@ -295,10 +296,11 @@ public static void setTableEnvTTL(Properties properties, StreamTableEnvironment
295296
ttlMaxtime = getTtlTime(Integer.parseInt(ttlMaxtimeStrMatcher.group(1)), ttlMaxtimeStrMatcher.group(2));
296297
}
297298
if (0L != ttlMintime && 0L != ttlMaxtime) {
298-
StreamQueryConfig qConfig = tableEnv.queryConfig();
299+
qConfig = tableEnv.queryConfig();
299300
qConfig.withIdleStateRetentionTime(Time.milliseconds(ttlMintime), Time.milliseconds(ttlMaxtime));
300301
}
301302
}
303+
return qConfig;
302304
}
303305

304306
/**

0 commit comments

Comments
 (0)