Skip to content

Commit 6ad858d

Browse files
committed
merge 1.8dev and modify FlinkSQLExec
1 parent 26fa23e commit 6ad858d

File tree

1 file changed

+39
-6
lines changed

1 file changed

+39
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
3535
import org.apache.flink.table.planner.operations.SqlToOperationConverter;
3636
import org.apache.flink.table.sinks.TableSink;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3739
import scala.Option;
3840

3941
import java.lang.reflect.InvocationTargetException;
@@ -48,7 +50,7 @@
4850
* @create: 2019/08/15 11:09
4951
*/
5052
public class FlinkSQLExec {
51-
53+
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
5254
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
5355
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5456
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
@@ -60,17 +62,35 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
6062
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
6163
TableSink tableSink = getTableSinkByPlanner(streamPlanner, targetTableName);
6264

63-
String[] fieldNames = tableSink.getTableSchema().getFieldNames();
65+
String[] sinkFieldNames = tableSink.getTableSchema().getFieldNames();
66+
String[] queryFieldNames = queryResult.getSchema().getFieldNames();
67+
68+
if (sinkFieldNames.length != queryFieldNames.length) {
69+
throw new ValidationException(
70+
"Field name of query result and registered TableSink " + targetTableName + " do not match.\n" +
71+
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
72+
"TableSink schema: " + String.join(",", sinkFieldNames));
73+
}
74+
75+
6476
Table newTable = null;
6577
try {
66-
newTable = queryResult.select(String.join(",", fieldNames));
78+
newTable = queryResult.select(String.join(",", sinkFieldNames));
6779
} catch (Exception e) {
6880
throw new ValidationException(
6981
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
70-
"Query result schema: " + String.join(",", queryResult.getSchema().getFieldNames()) + "\n" +
71-
"TableSink schema: " + String.join(",", fieldNames));
82+
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
83+
"TableSink schema: " + String.join(",", sinkFieldNames));
84+
}
85+
86+
try {
87+
tableEnv.insertInto(newTable, targetTableName);
88+
} catch (Exception e) {
89+
LOG.warn("Field name case of query result and registered TableSink do not match. ", e);
90+
newTable = queryResult.select(String.join(",", ignoreCase(queryFieldNames, sinkFieldNames)));
91+
tableEnv.insertInto(newTable, targetTableName);
7292
}
73-
tableEnv.insertInto(newTable, targetTableName);
93+
7494
}
7595

7696
private static TableSink getTableSinkByPlanner(StreamPlanner streamPlanner, String targetTableName)
@@ -90,4 +110,17 @@ private static TableImpl extractQueryTableFromInsertCaluse(StreamTableEnvironmen
90110
createTableMethod.setAccessible(true);
91111
return (TableImpl) createTableMethod.invoke(tableEnvImpl, queryOperation);
92112
}
113+
114+
private static String[] ignoreCase(String[] queryFieldNames, String[] sinkFieldNames) {
115+
String[] newFieldNames = sinkFieldNames;
116+
for (int i = 0; i < newFieldNames.length; i++) {
117+
for (String queryFieldName : queryFieldNames) {
118+
if (newFieldNames[i].equalsIgnoreCase(queryFieldName)) {
119+
newFieldNames[i] = queryFieldName;
120+
break;
121+
}
122+
}
123+
}
124+
return newFieldNames;
125+
}
93126
}

0 commit comments

Comments
 (0)