@@ -109,6 +109,8 @@ public static void main(String[] args) throws Exception {
109109 String name = options .getName ();
110110 String addJarListStr = options .getAddjar ();
111111 String localSqlPluginPath = options .getLocalSqlPluginPath ();
112+ String remoteSqlPluginPath = options .getRemoteSqlPluginPath ();
113+ String pluginLoadMode = options .getPluginLoadMode ();
112114 String deployMode = options .getMode ();
113115 String confProp = options .getConfProp ();
114116
@@ -141,9 +143,9 @@ public static void main(String[] args) throws Exception {
141143 //register udf
142144 registerUDF (sqlTree , jarURList , tableEnv );
143145 //register table schema
144- registerTable (sqlTree , env , tableEnv , options , sideTableMap , registerTableCache );
146+ registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , sideTableMap , registerTableCache );
145147
146- sqlTranslation (options , tableEnv ,sqlTree ,sideTableMap ,registerTableCache );
148+ sqlTranslation (localSqlPluginPath , tableEnv ,sqlTree ,sideTableMap ,registerTableCache );
147149
148150 if (env instanceof MyLocalStreamEnvironment ) {
149151 ((MyLocalStreamEnvironment ) env ).setClasspaths (ClassLoaderManager .getClassPath ());
@@ -152,9 +154,9 @@ public static void main(String[] args) throws Exception {
152154 env .execute (name );
153155 }
154156
155- private static void sqlTranslation (Options options , StreamTableEnvironment tableEnv ,SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,Map <String , Table > registerTableCache ) throws Exception {
157+ private static void sqlTranslation (String localSqlPluginPath , StreamTableEnvironment tableEnv ,SqlTree sqlTree ,Map <String , SideTableInfo > sideTableMap ,Map <String , Table > registerTableCache ) throws Exception {
156158 SideSqlExec sideSqlExec = new SideSqlExec ();
157- sideSqlExec .setLocalSqlPluginPath (options . getLocalSqlPluginPath () );
159+ sideSqlExec .setLocalSqlPluginPath (localSqlPluginPath );
158160 for (CreateTmpTableParser .SqlParserResult result : sqlTree .getTmpSqlList ()) {
159161 sideSqlExec .registerTmpTable (result , sideTableMap , tableEnv , registerTableCache );
160162 }
@@ -230,16 +232,16 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
230232 }
231233
232234
233- private static void registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , Options options ,
234- Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
235+ private static void registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
236+ String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
235237 Set <URL > classPathSet = Sets .newHashSet ();
236238 WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
237239 for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
238240
239241 if (tableInfo instanceof SourceTableInfo ) {
240242
241243 SourceTableInfo sourceTableInfo = (SourceTableInfo ) tableInfo ;
242- Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , options . getLocalSqlPluginPath () );
244+ Table table = StreamSourceFactory .getStreamSource (sourceTableInfo , env , tableEnv , localSqlPluginPath );
243245 tableEnv .registerTable (sourceTableInfo .getAdaptName (), table );
244246 //Note --- parameter conversion function can not be used inside a function of the type of polymerization
245247 //Create table in which the function is arranged only need adaptation sql
@@ -266,18 +268,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
266268 LOG .info ("registe table {} success." , tableInfo .getName ());
267269 }
268270 registerTableCache .put (tableInfo .getName (), regTable );
269- classPathSet .add (buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , options ));
271+ classPathSet .add (buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode ));
270272 } else if (tableInfo instanceof TargetTableInfo ) {
271273
272- TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , options . getLocalSqlPluginPath () );
274+ TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
273275 TypeInformation [] flinkTypes = FlinkUtil .transformTypes (tableInfo .getFieldClasses ());
274276 tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
275- classPathSet .add (buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , options ));
277+ classPathSet .add (buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode ));
276278 } else if (tableInfo instanceof SideTableInfo ){
277279
278280 String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
279281 sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
280- classPathSet .add (buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , options ));
282+ classPathSet .add (buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode ));
281283 }else {
282284 throw new RuntimeException ("not support table type:" + tableInfo .getType ());
283285 }
@@ -293,20 +295,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
293295 }
294296 }
295297
296- private static URL buildSourceAndSinkPathByLoadMode (String type , String suffix , Options options ) throws Exception {
297- String pluginLoadMode = options .getPluginLoadMode ();
298+ private static URL buildSourceAndSinkPathByLoadMode (String type , String suffix , String localSqlPluginPath , String remoteSqlPluginPath , String pluginLoadMode ) throws Exception {
298299 if (StringUtils .equalsIgnoreCase (pluginLoadMode , PluginLoadMode .classpath .name ())) {
299- return PluginUtil .getRemoteJarFilePath (type , suffix , options . getRemoteSqlPluginPath (), options . getLocalSqlPluginPath () );
300+ return PluginUtil .getRemoteJarFilePath (type , suffix , remoteSqlPluginPath , localSqlPluginPath );
300301 }
301- return PluginUtil .getLocalJarFilePath (type , suffix , options . getLocalSqlPluginPath () );
302+ return PluginUtil .getLocalJarFilePath (type , suffix , localSqlPluginPath );
302303 }
303304
304- private static URL buildSidePathByLoadMode (String type , String operator , String suffix , Options options ) throws Exception {
305- String pluginLoadMode = options .getPluginLoadMode ();
305+ private static URL buildSidePathByLoadMode (String type , String operator , String suffix , String localSqlPluginPath , String remoteSqlPluginPath , String pluginLoadMode ) throws Exception {
306306 if (StringUtils .equalsIgnoreCase (pluginLoadMode , PluginLoadMode .classpath .name ())) {
307- return PluginUtil .getRemoteSideJarFilePath (type , operator , suffix , options . getRemoteSqlPluginPath (), options . getLocalSqlPluginPath () );
307+ return PluginUtil .getRemoteSideJarFilePath (type , operator , suffix , remoteSqlPluginPath , localSqlPluginPath );
308308 }
309- return PluginUtil .getLocalSideJarFilePath (type , operator , suffix , options . getLocalSqlPluginPath () );
309+ return PluginUtil .getLocalSideJarFilePath (type , operator , suffix , localSqlPluginPath );
310310 }
311311
312312 private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
0 commit comments