Skip to content

Commit 62f53eb

Browse files
committed
rewrite FlinkSQLExec
1 parent d3882da commit 62f53eb

File tree

3 files changed

+100
-91
lines changed

3 files changed

+100
-91
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2929
//import com.dtstack.flink.sql.exec.FlinkSQLExec;
3030
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
31+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3132
import com.dtstack.flink.sql.option.OptionParser;
3233
import com.dtstack.flink.sql.parser.CreateFuncParser;
3334
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
@@ -188,8 +189,7 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
188189
//sql-dimensional table contains the dimension table of execution
189190
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
190191
}else{
191-
// FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
192-
tableEnv.sqlUpdate(result.getExecSql());
192+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
193193
if(LOG.isInfoEnabled()){
194194
LOG.info("exec sql: " + result.getExecSql());
195195
}
Lines changed: 93 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,93 @@
1-
///*
2-
// * Licensed to the Apache Software Foundation (ASF) under one
3-
// * or more contributor license agreements. See the NOTICE file
4-
// * distributed with this work for additional information
5-
// * regarding copyright ownership. The ASF licenses this file
6-
// * to you under the Apache License, Version 2.0 (the
7-
// * "License"); you may not use this file except in compliance
8-
// * with the License. You may obtain a copy of the License at
9-
// *
10-
// * http://www.apache.org/licenses/LICENSE-2.0
11-
// *
12-
// * Unless required by applicable law or agreed to in writing, software
13-
// * distributed under the License is distributed on an "AS IS" BASIS,
14-
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
// * See the License for the specific language governing permissions and
16-
// * limitations under the License.
17-
// */
18-
//
19-
//package com.dtstack.flink.sql.exec;
20-
//
21-
//import org.apache.calcite.sql.SqlIdentifier;
22-
//import org.apache.calcite.sql.SqlInsert;
23-
//import org.apache.calcite.sql.SqlNode;
24-
//import org.apache.flink.table.api.Table;
25-
//import org.apache.flink.table.api.TableEnvironment;
26-
//import org.apache.flink.table.api.TableException;
27-
//import org.apache.flink.table.api.ValidationException;
28-
//import org.apache.flink.table.api.java.StreamTableEnvironment;
29-
//import org.apache.flink.table.calcite.FlinkPlannerImpl;
30-
//import org.apache.flink.table.plan.logical.LogicalRelNode;
31-
//import org.apache.flink.table.plan.schema.TableSinkTable;
32-
//import org.apache.flink.table.plan.schema.TableSourceSinkTable;
33-
//import org.apache.flink.table.planner.sinks.TableSinkUtils;
34-
//import scala.Option;
35-
//
36-
//import java.lang.reflect.Method;
37-
//
38-
///**
39-
// * @description: mapping by name when insert into sink table
40-
// * @author: maqi
41-
// * @create: 2019/08/15 11:09
42-
// */
43-
//public class FlinkSQLExec {
44-
//
45-
// public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
46-
//
47-
// FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
48-
// SqlNode insert = planner.parse(stmt);
49-
//
50-
// if (!(insert instanceof SqlInsert)) {
51-
// throw new TableException(
52-
// "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
53-
// }
54-
// SqlNode query = ((SqlInsert) insert).getSource();
55-
//
56-
//// TableSinkUtils.validateSink();
57-
// SqlNode validatedQuery = planner.validate(query);
58-
//
59-
// Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
60-
// String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
61-
//
62-
// Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
63-
// method.setAccessible(true);
64-
// Option sinkTab = (Option)method.invoke(tableEnv, targetTableName);
65-
//
66-
// if (sinkTab.isEmpty()) {
67-
// throw new ValidationException("Sink table " + targetTableName + "not found in flink");
68-
// }
69-
//
70-
// TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get();
71-
// TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get();
72-
// String[] fieldNames = tableSinkTable.tableSink().getFieldNames();
73-
//
74-
// Table newTable = null;
75-
// try {
76-
// newTable = queryResult.select(String.join(",", fieldNames));
77-
// } catch (Exception e) {
78-
// throw new ValidationException(
79-
// "Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
80-
// "Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
81-
// "TableSink schema: " + String.join(",", fieldNames));
82-
// }
83-
//
84-
// tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
85-
// }
86-
//}
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.exec;
20+
21+
import org.apache.calcite.sql.SqlIdentifier;
22+
import org.apache.calcite.sql.SqlInsert;
23+
import org.apache.flink.sql.parser.dml.RichSqlInsert;
24+
import org.apache.flink.table.api.Table;
25+
import org.apache.flink.table.api.ValidationException;
26+
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
27+
import org.apache.flink.table.api.internal.TableImpl;
28+
import org.apache.flink.table.api.java.StreamTableEnvironment;
29+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
30+
import org.apache.flink.table.operations.QueryOperation;
31+
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
32+
import org.apache.flink.table.planner.delegation.PlannerBase;
33+
import org.apache.flink.table.planner.delegation.StreamPlanner;
34+
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
35+
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
36+
import org.apache.flink.table.sinks.TableSink;
37+
import scala.Option;
38+
39+
import java.lang.reflect.InvocationTargetException;
40+
import java.lang.reflect.Method;
41+
import java.util.Arrays;
42+
import java.util.List;
43+
44+
45+
/**
46+
* @description: mapping by name when insert into sink table
47+
* @author: maqi
48+
* @create: 2019/08/15 11:09
49+
*/
50+
public class FlinkSQLExec {
51+
52+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
53+
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
54+
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
55+
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
56+
57+
RichSqlInsert insert = (RichSqlInsert)flinkPlanner.parse(stmt);
58+
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
59+
60+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
61+
TableSink tableSink = getTableSinkByPlanner(streamPlanner, targetTableName);
62+
63+
String[] fieldNames = tableSink.getTableSchema().getFieldNames();
64+
Table newTable = null;
65+
try {
66+
newTable = queryResult.select(String.join(",", fieldNames));
67+
} catch (Exception e) {
68+
throw new ValidationException(
69+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
70+
"Query result schema: " + String.join(",", queryResult.getSchema().getFieldNames()) + "\n" +
71+
"TableSink schema: " + String.join(",", fieldNames));
72+
}
73+
tableEnv.insertInto(newTable, targetTableName);
74+
}
75+
76+
private static TableSink getTableSinkByPlanner(StreamPlanner streamPlanner, String targetTableName)
77+
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
78+
Method getTableSink = PlannerBase.class.getDeclaredMethod("getTableSink", List.class);
79+
getTableSink.setAccessible(true);
80+
Option tableSinkOption = (Option) getTableSink.invoke(streamPlanner, Arrays.asList(targetTableName));
81+
return (TableSink) tableSinkOption.get();
82+
}
83+
84+
private static TableImpl extractQueryTableFromInsertCaluse(StreamTableEnvironmentImpl tableEnvImpl, FlinkPlannerImpl flinkPlanner, RichSqlInsert insert)
85+
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
86+
87+
PlannerQueryOperation queryOperation = (PlannerQueryOperation) SqlToOperationConverter.convert(flinkPlanner,
88+
insert.getSource());
89+
Method createTableMethod = TableEnvironmentImpl.class.getDeclaredMethod("createTable", QueryOperation.class);
90+
createTableMethod.setAccessible(true);
91+
return (TableImpl) createTableMethod.invoke(tableEnvImpl, queryOperation);
92+
}
93+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2425
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2526
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2627
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
@@ -117,8 +118,8 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
117118
if(pollSqlNode.getKind() == INSERT){
118119
System.out.println("----------real exec sql-----------" );
119120
System.out.println(pollSqlNode.toString());
120-
// FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
121-
tableEnv.sqlUpdate(pollSqlNode.toString());
121+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
122+
// tableEnv.sqlUpdate(pollSqlNode.toString());
122123
if(LOG.isInfoEnabled()){
123124
LOG.info("exec sql: " + pollSqlNode.toString());
124125
}
@@ -677,7 +678,8 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
677678
}
678679

679680
if(pollSqlNode.getKind() == INSERT){
680-
tableEnv.sqlUpdate(pollSqlNode.toString());
681+
// tableEnv.sqlUpdate(pollSqlNode.toString());
682+
FlinkSQLExec.sqlUpdate(tableEnv,pollSqlNode.toString());
681683
}else if(pollSqlNode.getKind() == AS){
682684
AliasInfo aliasInfo = parseASNode(pollSqlNode);
683685
Table table = tableEnv.sqlQuery(aliasInfo.getName());

0 commit comments

Comments
 (0)