4747@ RequiredArgsConstructor (access = AccessLevel .PACKAGE )
4848class SqlExecutor {
4949
50- private static final Pattern SET_STATEMENT_PATTERN =
50+ private static final Pattern SET_PATTERN =
5151 Pattern .compile ("SET\\ s+'(\\ S+)'\\ s*=\\ s*'(.+)';?" , Pattern .CASE_INSENSITIVE );
5252
53+ private static final Pattern STATEMENT_SET_PATTERN =
54+ Pattern .compile ("EXECUTE\\ s+STATEMENT\\ s+SET" , Pattern .CASE_INSENSITIVE | Pattern .DOTALL );
55+
5356 private final TableEnvironment tEnv ;
5457
5558 SqlExecutor (Configuration config , @ Nullable String udfPath ) {
@@ -115,8 +118,11 @@ void setupSystemFunctions() {
115118 TableResult executeScript (String script ) {
116119 var statements = SqlUtils .parseStatements (script );
117120 TableResult tableResult = null ;
118- for (String statement : statements ) {
119- tableResult = executeStatement (statement );
121+
122+ var it = statements .iterator ();
123+ while (it .hasNext ()) {
124+ var statement = it .next ();
125+ tableResult = executeStatement (statement , it .hasNext ());
120126 }
121127
122128 return tableResult ;
@@ -141,24 +147,29 @@ TableResult executeCompiledPlan(String planJson) {
141147 }
142148 }
143149
144- private TableResult executeStatement (String statement ) {
150+ private TableResult executeStatement (String statement , boolean intermediate ) {
145151 TableResult tableResult = null ;
146152 try {
147- var setMatcher = SET_STATEMENT_PATTERN .matcher (statement .trim ());
153+ var setMatcher = SET_PATTERN .matcher (statement .trim ());
154+ var statementSetMatcher = STATEMENT_SET_PATTERN .matcher (statement .trim ());
148155
149156 if (setMatcher .matches ()) {
150157 // Handle SET statements
151158 var key = setMatcher .group (1 );
152159 var value = setMatcher .group (2 );
153160 tEnv .getConfig ().getConfiguration ().setString (key , value );
154161 log .info ("Set configuration: {} = {}" , key , value );
162+
155163 } else {
156164 log .info ("Executing statement:\n {}" , statement );
157165 tableResult = tEnv .executeSql (statement );
166+ if (statementSetMatcher .find () && intermediate ) {
167+ log .debug ("Make sure to wait intermediate statement set to finish..." );
168+ tableResult .await ();
169+ }
158170 }
159171 } catch (Exception e ) {
160- e .addSuppressed (new RuntimeException ("Error while executing stmt: " + statement ));
161- throw e ;
172+ throw new RuntimeException ("Error while executing stmt: " + statement , e );
162173 }
163174 return tableResult ;
164175 }
0 commit comments