3535import com .dtstack .flink .sql .util .FlinkUtil ;
3636import com .dtstack .flink .sql .util .PluginUtil ;
3737import org .apache .calcite .config .Lex ;
38+ import org .apache .calcite .sql .SqlIdentifier ;
3839import org .apache .calcite .sql .SqlInsert ;
3940import org .apache .calcite .sql .SqlNode ;
4041import org .apache .commons .cli .CommandLine ;
6061import org .apache .flink .streaming .api .environment .StreamContextEnvironment ;
6162import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
6263import org .apache .flink .table .api .Table ;
64+ import org .apache .flink .table .api .TableEnvironment ;
65+ import org .apache .flink .table .api .TableException ;
6366import org .apache .flink .table .api .java .StreamTableEnvironment ;
67+ import org .apache .flink .table .calcite .FlinkPlannerImpl ;
68+ import org .apache .flink .table .plan .logical .LogicalRelNode ;
69+ import org .apache .flink .table .plan .schema .TableSinkTable ;
6470import org .apache .flink .table .sinks .TableSink ;
6571import org .apache .flink .types .Row ;
6672import org .slf4j .Logger ;
@@ -204,7 +210,7 @@ public static void main(String[] args) throws Exception {
204210 //sql-dimensional table contains the dimension table of execution
205211 sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache );
206212 }else {
207- tableEnv . sqlUpdate (result .getExecSql ());
213+ sqlUpdate (tableEnv , result .getExecSql ());
208214 if (LOG .isInfoEnabled ()){
209215 LOG .info ("exec sql: " + result .getExecSql ());
210216 }
@@ -222,6 +228,36 @@ public static void main(String[] args) throws Exception {
222228 env .execute (name );
223229 }
224230
231+ public static void sqlUpdate (StreamTableEnvironment tableEnv , String stmt ) {
232+
233+ FlinkPlannerImpl planner = new FlinkPlannerImpl (tableEnv .getFrameworkConfig (), tableEnv .getPlanner (), tableEnv .getTypeFactory ());
234+ SqlNode insert = planner .parse (stmt );
235+
236+ if (!(insert instanceof SqlInsert )) {
237+ throw new TableException (
238+ "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT." );
239+ }
240+ SqlNode query = ((SqlInsert ) insert ).getSource ();
241+
242+ SqlNode validatedQuery = planner .validate (query );
243+
244+ Table queryResult = new Table (tableEnv , new LogicalRelNode (planner .rel (validatedQuery ).rel ));
245+ String targetTableName = ((SqlIdentifier ) ((SqlInsert ) insert ).getTargetTable ()).names .get (0 );
246+
247+ try {
248+ Method method = TableEnvironment .class .getDeclaredMethod ("getTable" , String .class );
249+ method .setAccessible (true );
250+
251+ TableSinkTable targetTable = (TableSinkTable ) method .invoke (tableEnv , targetTableName );
252+ String [] fieldNames = targetTable .tableSink ().getFieldNames ();
253+ Table newTable = queryResult .select (String .join ("," , fieldNames ));
254+ // insert query result into sink table
255+ tableEnv .insertInto (newTable , targetTableName , tableEnv .queryConfig ());
256+ } catch (Exception e ) {
257+ e .printStackTrace ();
258+ }
259+ }
260+
225261 /**
226262 * This part is just to add classpath for the jar when reading remote execution, and will not submit jar from a local
227263 * @param env
0 commit comments