2323import com .dtstack .flink .sql .classloader .DtClassLoader ;
2424import com .dtstack .flink .sql .enums .ECacheType ;
2525import com .dtstack .flink .sql .environment .MyLocalStreamEnvironment ;
26- import com .dtstack .flink .sql .options .LauncherOptionParser ;
27- import com .dtstack .flink .sql .options .LauncherOptions ;
2826import com .dtstack .flink .sql .parser .*;
2927import com .dtstack .flink .sql .side .SideSqlExec ;
3028import com .dtstack .flink .sql .side .SideTableInfo ;
4038import org .apache .calcite .config .Lex ;
4139import org .apache .calcite .sql .SqlInsert ;
4240import org .apache .calcite .sql .SqlNode ;
41+ import org .apache .commons .cli .CommandLine ;
42+ import org .apache .commons .cli .CommandLineParser ;
43+ import org .apache .commons .cli .DefaultParser ;
44+ import org .apache .commons .cli .Options ;
4345import org .apache .commons .io .Charsets ;
44- import org .apache .commons . lang3 . StringUtils ;
46+ import org .apache .flink . api . common . ExecutionConfig ;
4547import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
4648import org .apache .flink .api .common .time .Time ;
4749import org .apache .flink .api .common .typeinfo .TypeInformation ;
50+ import org .apache .flink .api .java .tuple .Tuple2 ;
4851import org .apache .flink .api .java .typeutils .RowTypeInfo ;
52+ import org .apache .flink .calcite .shaded .com .google .common .base .Preconditions ;
4953import org .apache .flink .calcite .shaded .com .google .common .base .Strings ;
5054import org .apache .flink .calcite .shaded .com .google .common .collect .Lists ;
5155import org .apache .flink .calcite .shaded .com .google .common .collect .Maps ;
5256import org .apache .flink .calcite .shaded .com .google .common .collect .Sets ;
5357import org .apache .flink .client .program .ContextEnvironment ;
58+ import org .apache .flink .configuration .Configuration ;
5459import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
5560import org .apache .flink .streaming .api .datastream .DataStream ;
5661import org .apache .flink .streaming .api .environment .StreamContextEnvironment ;
5762import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
5863import org .apache .flink .table .api .Table ;
5964import org .apache .flink .table .api .java .StreamTableEnvironment ;
6065import org .apache .flink .table .sinks .TableSink ;
66+ import org .apache .flink .types .Row ;
6167import org .slf4j .Logger ;
6268import org .slf4j .LoggerFactory ;
6369
6470import java .io .File ;
6571import java .io .IOException ;
6672import java .lang .reflect .Field ;
6773import java .lang .reflect .InvocationTargetException ;
74+ import java .lang .reflect .Method ;
6875import java .net .URL ;
6976import java .net .URLClassLoader ;
7077import java .net .URLDecoder ;
@@ -98,36 +105,44 @@ public class Main {
98105
99106 public static void main (String [] args ) throws Exception {
100107
101- LauncherOptionParser optionParser = new LauncherOptionParser (args );
102- LauncherOptions launcherOptions = optionParser .getLauncherOptions ();
103-
104- String sql = launcherOptions .getSql ();
105- String name =launcherOptions .getName ();
106- String addJarListStr = launcherOptions .getAddjar ();
107- String localSqlPluginPath = launcherOptions .getLocalSqlPluginPath ();
108- String remoteSqlPluginPath = launcherOptions .getRemoteSqlPluginPath ();
109- String deployMode = launcherOptions .getMode ();
110- String confProp = launcherOptions .getConfProp ();
108+ Options options = new Options ();
109+ options .addOption ("sql" , true , "sql config" );
110+ options .addOption ("name" , true , "job name" );
111+ options .addOption ("addjar" , true , "add jar" );
112+ options .addOption ("localSqlPluginPath" , true , "local sql plugin path" );
113+ options .addOption ("remoteSqlPluginPath" , true , "remote sql plugin path" );
114+ options .addOption ("confProp" , true , "env properties" );
115+ options .addOption ("mode" , true , "deploy mode" );
116+
117+ options .addOption ("savePointPath" , true , "Savepoint restore path" );
118+ options .addOption ("allowNonRestoredState" , true , "Flag indicating whether non restored state is allowed if the savepoint" );
119+
120+ CommandLineParser parser = new DefaultParser ();
121+ CommandLine cl = parser .parse (options , args );
122+ String sql = cl .getOptionValue ("sql" );
123+ String name = cl .getOptionValue ("name" );
124+ String addJarListStr = cl .getOptionValue ("addjar" );
125+ String localSqlPluginPath = cl .getOptionValue ("localSqlPluginPath" );
126+ String remoteSqlPluginPath = cl .getOptionValue ("remoteSqlPluginPath" );
127+ String deployMode = cl .getOptionValue ("mode" );
128+ String confProp = cl .getOptionValue ("confProp" );
129+
130+ Preconditions .checkNotNull (sql , "parameters of sql is required" );
131+ Preconditions .checkNotNull (name , "parameters of name is required" );
132+ Preconditions .checkNotNull (localSqlPluginPath , "parameters of localSqlPluginPath is required" );
111133
112134 sql = URLDecoder .decode (sql , Charsets .UTF_8 .name ());
113135 SqlParser .setLocalSqlPluginRoot (localSqlPluginPath );
114- List <String > addJarFileList = Lists .newArrayList ();
115136
137+ List <String > addJarFileList = Lists .newArrayList ();
116138 if (!Strings .isNullOrEmpty (addJarListStr )){
117139 addJarListStr = URLDecoder .decode (addJarListStr , Charsets .UTF_8 .name ());
118140 addJarFileList = objMapper .readValue (addJarListStr , List .class );
119141 }
120142
121143 ClassLoader threadClassLoader = Thread .currentThread ().getContextClassLoader ();
122- DtClassLoader dtClassLoader = new DtClassLoader (new URL []{}, threadClassLoader );
123- Thread .currentThread ().setContextClassLoader (dtClassLoader );
124-
125- URLClassLoader parentClassloader ;
126- if (!ClusterMode .local .name ().equals (deployMode )){
127- parentClassloader = (URLClassLoader ) threadClassLoader .getParent ();
128- }else {
129- parentClassloader = dtClassLoader ;
130- }
144+ DtClassLoader parentClassloader = new DtClassLoader (new URL []{}, threadClassLoader );
145+ Thread .currentThread ().setContextClassLoader (parentClassloader );
131146
132147 confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
133148 Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
@@ -198,7 +213,7 @@ public static void main(String[] args) throws Exception {
198213
199214 if (env instanceof MyLocalStreamEnvironment ) {
200215 List <URL > urlList = new ArrayList <>();
201- urlList .addAll (Arrays .asList (dtClassLoader .getURLs ()));
216+ urlList .addAll (Arrays .asList (parentClassloader .getURLs ()));
202217 ((MyLocalStreamEnvironment ) env ).setClasspaths (urlList );
203218 }
204219
@@ -221,12 +236,6 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
221236 contextEnvironment .getClasspaths ().add (url );
222237 }
223238 }
224- int i = 0 ;
225- for (URL url : classPathSet ){
226- String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
227- env .registerCachedFile (url .getPath (), classFileName , true );
228- i ++;
229- }
230239 }
231240
232241 private static void registerUDF (SqlTree sqlTree , List <URL > jarURList , URLClassLoader parentClassloader ,
@@ -240,7 +249,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
240249 if (classLoader == null ) {
241250 classLoader = FlinkUtil .loadExtraJar (jarURList , parentClassloader );
242251 }
243- classLoader .loadClass (funcInfo .getClassName ());
244252 FlinkUtil .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (),
245253 tableEnv , classLoader );
246254 }
@@ -265,7 +273,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
265273 Table adaptTable = adaptSql == null ? table : tableEnv .sqlQuery (adaptSql );
266274
267275 RowTypeInfo typeInfo = new RowTypeInfo (adaptTable .getSchema ().getTypes (), adaptTable .getSchema ().getColumnNames ());
268- DataStream adaptStream = tableEnv .toAppendStream (adaptTable , typeInfo );
276+ DataStream adaptStream = tableEnv .toRetractStream (adaptTable , typeInfo )
277+ .map ((Tuple2 <Boolean , Row > f0 ) -> { return f0 .f1 ; })
278+ .returns (typeInfo );
279+
269280 String fields = String .join ("," , typeInfo .getFieldNames ());
270281
271282 if (waterMarkerAssigner .checkNeedAssignWaterMarker (sourceTableInfo )){
@@ -278,38 +289,61 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
278289 Table regTable = tableEnv .fromDataStream (adaptStream , fields );
279290 tableEnv .registerTable (tableInfo .getName (), regTable );
280291 registerTableCache .put (tableInfo .getName (), regTable );
281- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
282- classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath ));
283- }
292+ classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
284293 } else if (tableInfo instanceof TargetTableInfo ) {
285294
286295 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
287296 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
288297 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
289- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
290- classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
291- }
298+ classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
292299 } else if (tableInfo instanceof SideTableInfo ){
300+
293301 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
294302 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
295- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
296- classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
297- }
303+ classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
298304 }else {
299305 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
300306 }
301307 }
302308
303309 //The plug-in information corresponding to the table is loaded into the classPath env
304310 addEnvClassPath (env , classPathSet );
311+ int i = 0 ;
312+ for (URL url : classPathSet ){
313+ String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
314+ env .registerCachedFile (url .getPath (), classFileName , true );
315+ i ++;
316+ }
305317 }
306318
307- private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws IOException {
319+ private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws IOException , NoSuchMethodException {
308320 StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
309321 StreamExecutionEnvironment .getExecutionEnvironment () :
310322 new MyLocalStreamEnvironment ();
311323
324+ env .getConfig ().disableClosureCleaner ();
312325 env .setParallelism (FlinkUtil .getEnvParallelism (confProperties ));
326+ Configuration globalJobParameters = new Configuration ();
327+ Method method = Configuration .class .getDeclaredMethod ("setValueInternal" , String .class , Object .class );
328+ method .setAccessible (true );
329+
330+ confProperties .forEach ((key ,val ) -> {
331+ try {
332+ method .invoke (globalJobParameters , key , val );
333+ } catch (IllegalAccessException e ) {
334+ e .printStackTrace ();
335+ } catch (InvocationTargetException e ) {
336+ e .printStackTrace ();
337+ }
338+ });
339+
340+ ExecutionConfig exeConfig = env .getConfig ();
341+ if (exeConfig .getGlobalJobParameters () == null ){
342+ exeConfig .setGlobalJobParameters (globalJobParameters );
343+ }else if (exeConfig .getGlobalJobParameters () instanceof Configuration ){
344+ ((Configuration ) exeConfig .getGlobalJobParameters ()).addAll (globalJobParameters );
345+ }
346+
313347
314348 if (FlinkUtil .getMaxEnvParallelism (confProperties ) > 0 ){
315349 env .setMaxParallelism (FlinkUtil .getMaxEnvParallelism (confProperties ));
0 commit comments