Skip to content

Commit e68819e

Browse files
committed
mapping by name when insert into sink table
1 parent 2b566e5 commit e68819e

File tree

3 files changed

+61
-40
lines changed

3 files changed

+61
-40
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25-
import com.dtstack.flink.sql.parser.*;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
26+
import com.dtstack.flink.sql.parser.CreateFuncParser;
27+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
28+
import com.dtstack.flink.sql.parser.InsertSqlParser;
29+
import com.dtstack.flink.sql.parser.SqlParser;
30+
import com.dtstack.flink.sql.parser.SqlTree;
2631
import com.dtstack.flink.sql.side.SideSqlExec;
2732
import com.dtstack.flink.sql.side.SideTableInfo;
2833
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -35,7 +40,6 @@
3540
import com.dtstack.flink.sql.util.FlinkUtil;
3641
import com.dtstack.flink.sql.util.PluginUtil;
3742
import org.apache.calcite.config.Lex;
38-
import org.apache.calcite.sql.SqlIdentifier;
3943
import org.apache.calcite.sql.SqlInsert;
4044
import org.apache.calcite.sql.SqlNode;
4145
import org.apache.commons.cli.CommandLine;
@@ -61,12 +65,7 @@
6165
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
6266
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6367
import org.apache.flink.table.api.Table;
64-
import org.apache.flink.table.api.TableEnvironment;
65-
import org.apache.flink.table.api.TableException;
6668
import org.apache.flink.table.api.java.StreamTableEnvironment;
67-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
68-
import org.apache.flink.table.plan.logical.LogicalRelNode;
69-
import org.apache.flink.table.plan.schema.TableSinkTable;
7069
import org.apache.flink.table.sinks.TableSink;
7170
import org.apache.flink.types.Row;
7271
import org.slf4j.Logger;
@@ -210,7 +209,7 @@ public static void main(String[] args) throws Exception {
210209
//sql-dimensional table contains the dimension table of execution
211210
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
212211
}else{
213-
sqlUpdate(tableEnv, result.getExecSql());
212+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
214213
if(LOG.isInfoEnabled()){
215214
LOG.info("exec sql: " + result.getExecSql());
216215
}
@@ -228,36 +227,6 @@ public static void main(String[] args) throws Exception {
228227
env.execute(name);
229228
}
230229

231-
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) {
232-
233-
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
234-
SqlNode insert = planner.parse(stmt);
235-
236-
if (!(insert instanceof SqlInsert)) {
237-
throw new TableException(
238-
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
239-
}
240-
SqlNode query = ((SqlInsert) insert).getSource();
241-
242-
SqlNode validatedQuery = planner.validate(query);
243-
244-
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
245-
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
246-
247-
try {
248-
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
249-
method.setAccessible(true);
250-
251-
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
252-
String[] fieldNames = targetTable.tableSink().getFieldNames();
253-
Table newTable = queryResult.select(String.join(",", fieldNames));
254-
// insert query result into sink table
255-
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
256-
} catch (Exception e) {
257-
e.printStackTrace();
258-
}
259-
}
260-
261230
/**
262231
* This part is just to add classpath for the jar when reading remote execution, and will not submit jar from a local
263232
* @param env
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.apache.calcite.sql.SqlIdentifier;
4+
import org.apache.calcite.sql.SqlInsert;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.flink.table.api.Table;
7+
import org.apache.flink.table.api.TableEnvironment;
8+
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.java.StreamTableEnvironment;
10+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
11+
import org.apache.flink.table.plan.logical.LogicalRelNode;
12+
import org.apache.flink.table.plan.schema.TableSinkTable;
13+
14+
import java.lang.reflect.Method;
15+
16+
/**
17+
* @description: mapping by name when insert into sink table
18+
* @author: maqi
19+
* @create: 2019/08/15 11:09
20+
*/
21+
public class FlinkSQLExec {
22+
23+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
24+
25+
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
26+
SqlNode insert = planner.parse(stmt);
27+
28+
if (!(insert instanceof SqlInsert)) {
29+
throw new TableException(
30+
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
31+
}
32+
SqlNode query = ((SqlInsert) insert).getSource();
33+
34+
SqlNode validatedQuery = planner.validate(query);
35+
36+
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
37+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
38+
39+
try {
40+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41+
method.setAccessible(true);
42+
43+
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44+
String[] fieldNames = targetTable.tableSink().getFieldNames();
45+
Table newTable = queryResult.select(String.join(",", fieldNames));
46+
// insert query result into sink table
47+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
48+
} catch (Exception e) {
49+
throw e;
50+
}
51+
}
52+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.Main;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2526
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2627
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2728
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
@@ -106,8 +107,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
106107
}
107108

108109
if(pollSqlNode.getKind() == INSERT){
109-
// tableEnv.sqlUpdate(pollSqlNode.toString());
110-
Main.sqlUpdate(tableEnv, pollSqlNode.toString());
110+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
111111
if(LOG.isInfoEnabled()){
112112
LOG.info("exec sql: " + pollSqlNode.toString());
113113
}

0 commit comments

Comments
 (0)