Skip to content

Commit eacbf12

Browse files
committed
Merge branch '1.5.0_dev_field_by_name' into 'v1.5.0_dev'
插入时按名称映射 See merge request !49
2 parents 2736723 + ba287a0 commit eacbf12

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25-
import com.dtstack.flink.sql.parser.*;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
26+
import com.dtstack.flink.sql.parser.CreateFuncParser;
27+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
28+
import com.dtstack.flink.sql.parser.InsertSqlParser;
29+
import com.dtstack.flink.sql.parser.SqlParser;
30+
import com.dtstack.flink.sql.parser.SqlTree;
2631
import com.dtstack.flink.sql.side.SideSqlExec;
2732
import com.dtstack.flink.sql.side.SideTableInfo;
2833
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -204,7 +209,7 @@ public static void main(String[] args) throws Exception {
204209
//sql-dimensional table contains the dimension table of execution
205210
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
206211
}else{
207-
tableEnv.sqlUpdate(result.getExecSql());
212+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
208213
if(LOG.isInfoEnabled()){
209214
LOG.info("exec sql: " + result.getExecSql());
210215
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.apache.calcite.sql.SqlIdentifier;
4+
import org.apache.calcite.sql.SqlInsert;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.flink.table.api.Table;
7+
import org.apache.flink.table.api.TableEnvironment;
8+
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.java.StreamTableEnvironment;
10+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
11+
import org.apache.flink.table.plan.logical.LogicalRelNode;
12+
import org.apache.flink.table.plan.schema.TableSinkTable;
13+
14+
import java.lang.reflect.Method;
15+
16+
/**
17+
* @description: mapping by name when insert into sink table
18+
* @author: maqi
19+
* @create: 2019/08/15 11:09
20+
*/
21+
public class FlinkSQLExec {
22+
23+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
24+
25+
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
26+
SqlNode insert = planner.parse(stmt);
27+
28+
if (!(insert instanceof SqlInsert)) {
29+
throw new TableException(
30+
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
31+
}
32+
SqlNode query = ((SqlInsert) insert).getSource();
33+
34+
SqlNode validatedQuery = planner.validate(query);
35+
36+
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
37+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
38+
39+
try {
40+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41+
method.setAccessible(true);
42+
43+
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44+
String[] fieldNames = targetTable.tableSink().getFieldNames();
45+
Table newTable = queryResult.select(String.join(",", fieldNames));
46+
// insert query result into sink table
47+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
48+
} catch (Exception e) {
49+
throw e;
50+
}
51+
}
52+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23+
import com.dtstack.flink.sql.Main;
2324
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2426
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2527
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2628
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
@@ -105,7 +107,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
105107
}
106108

107109
if(pollSqlNode.getKind() == INSERT){
108-
tableEnv.sqlUpdate(pollSqlNode.toString());
110+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
109111
if(LOG.isInfoEnabled()){
110112
LOG.info("exec sql: " + pollSqlNode.toString());
111113
}

0 commit comments

Comments
 (0)