2323import com .dtstack .flink .sql .classloader .DtClassLoader ;
2424import com .dtstack .flink .sql .enums .ECacheType ;
2525import com .dtstack .flink .sql .environment .MyLocalStreamEnvironment ;
26- import com .dtstack .flink .sql .options .LauncherOptionParser ;
27- import com .dtstack .flink .sql .options .LauncherOptions ;
2826import com .dtstack .flink .sql .parser .*;
2927import com .dtstack .flink .sql .side .SideSqlExec ;
3028import com .dtstack .flink .sql .side .SideTableInfo ;
4038import org .apache .calcite .config .Lex ;
4139import org .apache .calcite .sql .SqlInsert ;
4240import org .apache .calcite .sql .SqlNode ;
41+ import org .apache .commons .cli .CommandLine ;
42+ import org .apache .commons .cli .CommandLineParser ;
43+ import org .apache .commons .cli .DefaultParser ;
44+ import org .apache .commons .cli .Options ;
4345import org .apache .commons .io .Charsets ;
44- import org .apache .commons . lang3 . StringUtils ;
46+ import org .apache .flink . api . common . ExecutionConfig ;
4547import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
4648import org .apache .flink .api .common .time .Time ;
4749import org .apache .flink .api .common .typeinfo .TypeInformation ;
50+ import org .apache .flink .api .java .tuple .Tuple2 ;
4851import org .apache .flink .api .java .typeutils .RowTypeInfo ;
52+ import org .apache .flink .calcite .shaded .com .google .common .base .Preconditions ;
4953import org .apache .flink .calcite .shaded .com .google .common .base .Strings ;
5054import org .apache .flink .calcite .shaded .com .google .common .collect .Lists ;
5155import org .apache .flink .calcite .shaded .com .google .common .collect .Maps ;
5256import org .apache .flink .calcite .shaded .com .google .common .collect .Sets ;
5357import org .apache .flink .client .program .ContextEnvironment ;
58+ import org .apache .flink .configuration .Configuration ;
5459import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
5560import org .apache .flink .streaming .api .datastream .DataStream ;
5661import org .apache .flink .streaming .api .environment .StreamContextEnvironment ;
5762import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
5863import org .apache .flink .table .api .Table ;
5964import org .apache .flink .table .api .java .StreamTableEnvironment ;
6065import org .apache .flink .table .sinks .TableSink ;
66+ import org .apache .flink .types .Row ;
6167import org .slf4j .Logger ;
6268import org .slf4j .LoggerFactory ;
6369
6470import java .io .File ;
6571import java .io .IOException ;
6672import java .lang .reflect .Field ;
6773import java .lang .reflect .InvocationTargetException ;
74+ import java .lang .reflect .Method ;
6875import java .net .URL ;
6976import java .net .URLClassLoader ;
7077import java .net .URLDecoder ;
@@ -98,21 +105,36 @@ public class Main {
98105
99106 public static void main (String [] args ) throws Exception {
100107
101- LauncherOptionParser optionParser = new LauncherOptionParser (args );
102- LauncherOptions launcherOptions = optionParser .getLauncherOptions ();
103-
104- String sql = launcherOptions .getSql ();
105- String name =launcherOptions .getName ();
106- String addJarListStr = launcherOptions .getAddjar ();
107- String localSqlPluginPath = launcherOptions .getLocalSqlPluginPath ();
108- String remoteSqlPluginPath = launcherOptions .getRemoteSqlPluginPath ();
109- String deployMode = launcherOptions .getMode ();
110- String confProp = launcherOptions .getConfProp ();
108+ Options options = new Options ();
109+ options .addOption ("sql" , true , "sql config" );
110+ options .addOption ("name" , true , "job name" );
111+ options .addOption ("addjar" , true , "add jar" );
112+ options .addOption ("localSqlPluginPath" , true , "local sql plugin path" );
113+ options .addOption ("remoteSqlPluginPath" , true , "remote sql plugin path" );
114+ options .addOption ("confProp" , true , "env properties" );
115+ options .addOption ("mode" , true , "deploy mode" );
116+
117+ options .addOption ("savePointPath" , true , "Savepoint restore path" );
118+ options .addOption ("allowNonRestoredState" , true , "Flag indicating whether non restored state is allowed if the savepoint" );
119+
120+ CommandLineParser parser = new DefaultParser ();
121+ CommandLine cl = parser .parse (options , args );
122+ String sql = cl .getOptionValue ("sql" );
123+ String name = cl .getOptionValue ("name" );
124+ String addJarListStr = cl .getOptionValue ("addjar" );
125+ String localSqlPluginPath = cl .getOptionValue ("localSqlPluginPath" );
126+ String remoteSqlPluginPath = cl .getOptionValue ("remoteSqlPluginPath" );
127+ String deployMode = cl .getOptionValue ("mode" );
128+ String confProp = cl .getOptionValue ("confProp" );
129+
130+ Preconditions .checkNotNull (sql , "parameters of sql is required" );
131+ Preconditions .checkNotNull (name , "parameters of name is required" );
132+ Preconditions .checkNotNull (localSqlPluginPath , "parameters of localSqlPluginPath is required" );
111133
112134 sql = URLDecoder .decode (sql , Charsets .UTF_8 .name ());
113135 SqlParser .setLocalSqlPluginRoot (localSqlPluginPath );
114- List <String > addJarFileList = Lists .newArrayList ();
115136
137+ List <String > addJarFileList = Lists .newArrayList ();
116138 if (!Strings .isNullOrEmpty (addJarListStr )){
117139 addJarListStr = URLDecoder .decode (addJarListStr , Charsets .UTF_8 .name ());
118140 addJarFileList = objMapper .readValue (addJarListStr , List .class );
@@ -221,12 +243,6 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
221243 contextEnvironment .getClasspaths ().add (url );
222244 }
223245 }
224- int i = 0 ;
225- for (URL url : classPathSet ){
226- String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
227- env .registerCachedFile (url .getPath (), classFileName , true );
228- i ++;
229- }
230246 }
231247
232248 private static void registerUDF (SqlTree sqlTree , List <URL > jarURList , URLClassLoader parentClassloader ,
@@ -240,7 +256,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
240256 if (classLoader == null ) {
241257 classLoader = FlinkUtil .loadExtraJar (jarURList , parentClassloader );
242258 }
243- classLoader .loadClass (funcInfo .getClassName ());
244259 FlinkUtil .registerUDF (funcInfo .getType (), funcInfo .getClassName (), funcInfo .getName (),
245260 tableEnv , classLoader );
246261 }
@@ -265,7 +280,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
265280 Table adaptTable = adaptSql == null ? table : tableEnv .sqlQuery (adaptSql );
266281
267282 RowTypeInfo typeInfo = new RowTypeInfo (adaptTable .getSchema ().getTypes (), adaptTable .getSchema ().getColumnNames ());
268- DataStream adaptStream = tableEnv .toAppendStream (adaptTable , typeInfo );
283+ DataStream adaptStream = tableEnv .toRetractStream (adaptTable , typeInfo )
284+ .map ((Tuple2 <Boolean , Row > f0 ) -> { return f0 .f1 ; })
285+ .returns (typeInfo );
286+
269287 String fields = String .join ("," , typeInfo .getFieldNames ());
270288
271289 if (waterMarkerAssigner .checkNeedAssignWaterMarker (sourceTableInfo )){
@@ -278,38 +296,60 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
278296 Table regTable = tableEnv .fromDataStream (adaptStream , fields );
279297 tableEnv .registerTable (tableInfo .getName (), regTable );
280298 registerTableCache .put (tableInfo .getName (), regTable );
281- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
282- classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath ));
283- }
299+ classPathSet .add (PluginUtil .getRemoteJarFilePath (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
284300 } else if (tableInfo instanceof TargetTableInfo ) {
285301
286302 TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
287303 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
288304 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
289- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
290- classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
291- }
305+ classPathSet .add ( PluginUtil .getRemoteJarFilePath (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
292306 } else if (tableInfo instanceof SideTableInfo ){
307+
293308 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
294309 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
295- if (StringUtils .isNotBlank (remoteSqlPluginPath )){
296- classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath ));
297- }
310+ classPathSet .add (PluginUtil .getRemoteSideJarFilePath (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , remoteSqlPluginPath , localSqlPluginPath ));
298311 }else {
299312 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
300313 }
301314 }
302315
303316 //The plug-in information corresponding to the table is loaded into the classPath env
304317 addEnvClassPath (env , classPathSet );
318+ int i = 0 ;
319+ for (URL url : classPathSet ){
320+ String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
321+ env .registerCachedFile (url .getPath (), classFileName , true );
322+ i ++;
323+ }
305324 }
306325
307- private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws IOException {
326+ private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws IOException , NoSuchMethodException {
308327 StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
309328 StreamExecutionEnvironment .getExecutionEnvironment () :
310329 new MyLocalStreamEnvironment ();
311330
312331 env .setParallelism (FlinkUtil .getEnvParallelism (confProperties ));
332+ Configuration globalJobParameters = new Configuration ();
333+ Method method = Configuration .class .getDeclaredMethod ("setValueInternal" , String .class , Object .class );
334+ method .setAccessible (true );
335+
336+ confProperties .forEach ((key ,val ) -> {
337+ try {
338+ method .invoke (globalJobParameters , key , val );
339+ } catch (IllegalAccessException e ) {
340+ e .printStackTrace ();
341+ } catch (InvocationTargetException e ) {
342+ e .printStackTrace ();
343+ }
344+ });
345+
346+ ExecutionConfig exeConfig = env .getConfig ();
347+ if (exeConfig .getGlobalJobParameters () == null ){
348+ exeConfig .setGlobalJobParameters (globalJobParameters );
349+ }else if (exeConfig .getGlobalJobParameters () instanceof Configuration ){
350+ ((Configuration ) exeConfig .getGlobalJobParameters ()).addAll (globalJobParameters );
351+ }
352+
313353
314354 if (FlinkUtil .getMaxEnvParallelism (confProperties ) > 0 ){
315355 env .setMaxParallelism (FlinkUtil .getMaxEnvParallelism (confProperties ));
0 commit comments