2121package com .dtstack .flink .sql ;
2222
2323import com .dtstack .flink .sql .classloader .DtClassLoader ;
24+ import com .dtstack .flink .sql .enums .ClusterMode ;
2425import com .dtstack .flink .sql .enums .ECacheType ;
2526import com .dtstack .flink .sql .environment .MyLocalStreamEnvironment ;
2627import com .dtstack .flink .sql .exec .FlinkSQLExec ;
8283import java .util .Properties ;
8384import java .util .Set ;
8485import java .util .concurrent .TimeUnit ;
86+ import com .dtstack .flink .sql .option .Options ;
8587
8688/**
8789 * Date: 2018/6/26
@@ -106,7 +108,7 @@ public class Main {
106108 public static void main (String [] args ) throws Exception {
107109
108110 OptionParser optionParser = new OptionParser (args );
109- com . dtstack . flink . sql . option . Options options = optionParser .getOptions ();
111+ Options options = optionParser .getOptions ();
110112 String sql = options .getSql ();
111113 String name = options .getName ();
112114 String addJarListStr = options .getAddjar ();
@@ -149,9 +151,20 @@ public static void main(String[] args) throws Exception {
149151 //register table schema
150152 registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , sideTableMap , registerTableCache );
151153
152- SideSqlExec sideSqlExec = new SideSqlExec ();
153- sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
154+ sqlTranslation (options ,tableEnv ,sqlTree ,sideTableMap ,registerTableCache );
155+
156+ if (env instanceof MyLocalStreamEnvironment ) {
157+ List <URL > urlList = new ArrayList <>();
158+ urlList .addAll (Arrays .asList (parentClassloader .getURLs ()));
159+ ((MyLocalStreamEnvironment ) env ).setClasspaths (urlList );
160+ }
154161
162+ env .execute (name );
163+ }
164+
165+ private static void sqlTranslation (Options options ,StreamTableEnvironment tableEnv ,SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,Map <String , Table > registerTableCache ) throws Exception {
166+ SideSqlExec sideSqlExec = new SideSqlExec ();
167+ sideSqlExec .setLocalSqlPluginPath (options .getLocalSqlPluginPath ());
155168 for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
156169 sideSqlExec .registerTmpTable (result , sideTableMap , tableEnv , registerTableCache );
157170 }
@@ -160,9 +173,7 @@ public static void main(String[] args) throws Exception {
160173 if (LOG .isInfoEnabled ()){
161174 LOG .info ("exe-sql:\n " + result .getExecSql ());
162175 }
163-
164176 boolean isSide = false ;
165-
166177 for (String tableName : result .getTargetTableList ()) {
167178 if (sqlTree .getTmpTableMap ().containsKey (tableName )) {
168179 CreateTmpTableParser .SqlParserResult tmp = sqlTree .getTmpTableMap ().get (tableName );
@@ -183,7 +194,6 @@ public static void main(String[] args) throws Exception {
183194 break ;
184195 }
185196 }
186-
187197 if (isSide ){
188198 //sql-dimensional table contains the dimension table of execution
189199 sideSqlExec .exec (result .getExecSql (), sideTableMap , tableEnv , registerTableCache );
@@ -197,15 +207,8 @@ public static void main(String[] args) throws Exception {
197207 }
198208 }
199209
200- if (env instanceof MyLocalStreamEnvironment ) {
201- List <URL > urlList = new ArrayList <>();
202- urlList .addAll (Arrays .asList (parentClassloader .getURLs ()));
203- ((MyLocalStreamEnvironment ) env ).setClasspaths (urlList );
204- }
205210
206- env .execute (name );
207211 }
208-
209212 /**
210213 * This part is just to add classpath for the jar when reading remote execution, and will not submit jar from a local
211214 * @param env
@@ -314,42 +317,32 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
314317 Configuration globalJobParameters = new Configuration ();
315318 Method method = Configuration .class .getDeclaredMethod ("setValueInternal" , String .class , Object .class );
316319 method .setAccessible (true );
317-
318320 confProperties .forEach ((key ,val ) -> {
319321 try {
320322 method .invoke (globalJobParameters , key , val );
321- } catch (IllegalAccessException e ) {
322- e .printStackTrace ();
323- } catch (InvocationTargetException e ) {
324- e .printStackTrace ();
323+ } catch (Exception e ) {
324+ LOG .error ("set Configuration key:{},value:{} error:{}" ,key ,val ,e );
325325 }
326326 });
327-
328327 ExecutionConfig exeConfig = env .getConfig ();
329328 if (exeConfig .getGlobalJobParameters () == null ){
330329 exeConfig .setGlobalJobParameters (globalJobParameters );
331330 }else if (exeConfig .getGlobalJobParameters () instanceof Configuration ){
332331 ((Configuration ) exeConfig .getGlobalJobParameters ()).addAll (globalJobParameters );
333332 }
334-
335-
336333 if (FlinkUtil .getMaxEnvParallelism (confProperties ) > 0 ){
337334 env .setMaxParallelism (FlinkUtil .getMaxEnvParallelism (confProperties ));
338335 }
339-
340336 if (FlinkUtil .getBufferTimeoutMillis (confProperties ) > 0 ){
341337 env .setBufferTimeout (FlinkUtil .getBufferTimeoutMillis (confProperties ));
342338 }
343-
344339 env .setRestartStrategy (RestartStrategies .failureRateRestart (
345340 failureRate ,
346341 Time .of (failureInterval , TimeUnit .MINUTES ),
347342 Time .of (delayInterval , TimeUnit .SECONDS )
348343 ));
349-
350344 FlinkUtil .setStreamTimeCharacteristic (env , confProperties );
351345 FlinkUtil .openCheckpoint (env , confProperties );
352-
353346 return env ;
354347 }
355348}
0 commit comments