Skip to content

Commit e8db082

Browse files
committed
insert sink table by name
1 parent 13395b5 commit e8db082

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
333333
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
334334
StreamExecutionEnvironment.getExecutionEnvironment() :
335335
new MyLocalStreamEnvironment();
336-
336+
env.getConfig().disableClosureCleaner();
337337
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
338338
Configuration globalJobParameters = new Configuration();
339339
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.flink.table.calcite.FlinkPlannerImpl;
1212
import org.apache.flink.table.plan.logical.LogicalRelNode;
1313
import org.apache.flink.table.plan.schema.TableSinkTable;
14+
import org.apache.flink.table.plan.schema.TableSourceSinkTable;
15+
import scala.Option;
1416

1517
import java.lang.reflect.Method;
1618

@@ -39,12 +41,17 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
3941

4042
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
4143
method.setAccessible(true);
44+
Option sinkTab = (Option)method.invoke(tableEnv, targetTableName);
4245

43-
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44-
String[] fieldNames = targetTable.tableSink().getFieldNames();
46+
if (sinkTab.isEmpty()) {
47+
throw new ValidationException("Sink table " + targetTableName + "not found in flink");
48+
}
4549

46-
Table newTable = null;
50+
TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get();
51+
TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get();
52+
String[] fieldNames = tableSinkTable.tableSink().getFieldNames();
4753

54+
Table newTable = null;
4855
try {
4956
newTable = queryResult.select(String.join(",", fieldNames));
5057
} catch (Exception e) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
116116
}
117117
}else if(pollSqlNode.getKind() == AS){
118118
AliasInfo aliasInfo = parseASNode(pollSqlNode);
119-
Table table = tableEnv.sql(aliasInfo.getName());
119+
Table table = tableEnv.sqlQuery(aliasInfo.getName());
120120
tableEnv.registerTable(aliasInfo.getAlias(), table);
121121
localTableCache.put(aliasInfo.getAlias(), table);
122122
}
@@ -613,7 +613,7 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
613613
tableEnv.sqlUpdate(pollSqlNode.toString());
614614
}else if(pollSqlNode.getKind() == AS){
615615
AliasInfo aliasInfo = parseASNode(pollSqlNode);
616-
Table table = tableEnv.sql(aliasInfo.getName());
616+
Table table = tableEnv.sqlQuery(aliasInfo.getName());
617617
tableEnv.registerTable(aliasInfo.getAlias(), table);
618618
if(LOG.isInfoEnabled()){
619619
LOG.info("Register Table {} by {}", aliasInfo.getAlias(), aliasInfo.getName());
@@ -740,7 +740,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
740740
fieldNames.add(fieldName);
741741
String fieldType = filed[filed.length - 1 ].trim();
742742
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
743-
Class tableField = table.getSchema().getType(i).get().getTypeClass();
743+
Class tableField = table.getSchema().getFieldType(i).get().getTypeClass();
744744
if (fieldClass == tableField){
745745
continue;
746746
} else {

0 commit comments

Comments
 (0)