1919
2020import org .apache .commons .lang3 .StringUtils ;
2121
22-
2322import org .apache .calcite .jdbc .CalciteSchema ;
2423import org .apache .calcite .jdbc .JavaTypeFactoryImpl ;
2524import org .apache .calcite .rel .RelNode ;
3837import org .apache .wayang .basic .data .Record ;
3938import org .apache .wayang .core .api .Configuration ;
4039import org .apache .wayang .core .plugin .Plugin ;
40+ import org .apache .wayang .api .utils .Parameters ;
4141import org .apache .wayang .core .api .WayangContext ;
4242import org .apache .wayang .core .util .ReflectionUtils ;
4343import org .apache .wayang .core .plan .wayangplan .WayangPlan ;
4444import org .apache .wayang .java .Java ;
4545import org .apache .wayang .postgres .Postgres ;
4646import org .apache .wayang .spark .Spark ;
47+ import org .apache .commons .cli .*;
48+
49+ import com .google .common .io .Resources ;
4750
4851import org .json .simple .JSONObject ;
4952import org .json .simple .parser .JSONParser ;
5053
54+ import scala .collection .JavaConversions ;
5155import java .io .BufferedWriter ;
5256import java .io .IOException ;
5357import java .nio .file .Files ;
58+ import java .nio .charset .Charset ;
5459import java .nio .file .Paths ;
60+ import java .net .URL ;
5561import java .sql .SQLException ;
5662import java .util .ArrayList ;
63+ import java .util .Arrays ;
5764import java .util .Collection ;
5865import java .util .Properties ;
5966import java .util .concurrent .atomic .AtomicInteger ;
@@ -103,43 +110,46 @@ public static void main(final String[] args) throws Exception {
103110 throw new IllegalArgumentException (
104111 "Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <SQL statement path> <JDBC driver> <JDBC URL> <JDBC user> <JDBC password> <Result output path> [platforms...]" );
105112
106- final String queryPath = args [0 ];
107- final String jdbcDriver = args [1 ];
108- final String jdbcUrl = args [2 ];
109- final String jdbcUser = args [3 ];
110- final String jdbcPassword = args [4 ];
111- final String outputPath = args [5 ];
113+ //Specify the named arguments
114+ Options options = new Options ();
115+ options .addOption ("p" , "platforms" , true , "[platforms...]" );
116+ options .addOption ("s" , "schema" , true , "Schema path" );
117+ options .addOption ("q" , "query" , true , "SQL statement path" );
118+ options .addOption ("o" , "outputPath" , true , "Output path" );
119+ options .addOption ("d" , "data" , true , "Data path for file-based schema" );
120+ options .addOption ("c" , "config" , true , "File path for config file" );
121+ options .addOption ("jdbcDriver" , true , "JDBC driver" );
122+ options .addOption ("jdbcUrl" , true , "JDBC URL" );
123+ options .addOption ("jdbcPassword" , true , "JDBC URL" );
124+
125+ CommandLineParser parser = new DefaultParser ();
126+ CommandLine cmd = parser .parse (options , args );
127+
128+ final String queryPath = cmd .getOptionValue ("q" );
129+ final String jdbcDriver = cmd .getOptionValue ("jdbcDriver" );
130+ final String jdbcUrl = cmd .getOptionValue ("jdbcUrl" );
131+ final String jdbcUser = cmd .getOptionValue ("jdbcUser" );
132+ final String jdbcPassword = cmd .getOptionValue ("jdbcPassword" );
133+ final String outputPath = cmd .getOptionValue ("o" );
134+ final String dataPath = cmd .getOptionValue ("d" );
135+ final String schemaPath = cmd .getOptionValue ("s" );
112136
113137 final String query = StringUtils .chop (
114138 Files .readString (Paths .get (queryPath ))
115139 .stripTrailing ());
116140
117141 final String driverPlatform = jdbcDriver .split ("\\ ." )[0 ];
118142
119- final String calciteModel = String .format (
120- "{\r \n " +
121- "\" calcite\" : {\r \n " +
122- " \" version\" : \" 1.0\" ,\n " +
123- " \" defaultSchema\" : \" wayang\" ,\n " +
124- " \" schemas\" : [\n " +
125- " {\n " +
126- " \" name\" : \" postgres\" ,\n " +
127- " \" type\" : \" custom\" ,\n " +
128- " \" factory\" : \" org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory\" ,\n " +
129- " \" operand\" : {\n " +
130- " \" jdbcDriver\" : \" %s\" ,\n " +
131- " \" jdbcUrl\" : \" %s\" ,\n " +
132- " \" jdbcUser\" : \" %s\" ,\n " +
133- " \" jdbcPassword\" : \" %s\" \n " +
134- " }\n " +
135- " }\n " +
136- " ]\n " +
137- "}\r \n " +
138- "}" ,
139- jdbcDriver , jdbcUrl , jdbcUser , jdbcPassword );
140-
141143 final Configuration configuration = new Configuration ();
142- configuration .load (ReflectionUtils .loadResource ("wayang-defaults.properties" ));
144+
145+ if (cmd .hasOption ("c" )) {
146+ configuration .load (cmd .getOptionValue ("c" ));
147+ }
148+
149+ final String calciteModel = Resources .toString (
150+ new URL (schemaPath ),
151+ Charset .defaultCharset ()
152+ );
143153
144154 configuration .setProperty ("wayang.calcite.model" , calciteModel );
145155 configuration .setProperty (String .format ("wayang.%s.jdbc.url" , driverPlatform ), jdbcUrl );
@@ -153,23 +163,8 @@ public static void main(final String[] args) throws Exception {
153163 final SqlContext context = new SqlContext (parseModel ,
154164 List .of (Java .channelConversionPlugin (), Postgres .conversionPlugin ()));
155165
156- for (int i = 6 ; i < args .length ; i ++) {
157- final String platform = args [i ];
158-
159- switch (platform .toLowerCase ()) {
160- case "spark" :
161- context .withPlugin (Spark .basicPlugin ());
162- break ;
163- case "java" :
164- context .withPlugin (Java .basicPlugin ());
165- break ;
166- case "postgres" :
167- context .withPlugin (Postgres .plugin ());
168- break ;
169- default :
170- throw new IllegalArgumentException ("platform not supported " + platform );
171- }
172- }
166+ List <Plugin > plugins = JavaConversions .seqAsJavaList (Parameters .loadPlugins (cmd .getOptionValue ("p" )));
167+ plugins .stream ().forEach (context ::register );
173168
174169 final Collection <Record > result = context .executeSql (query );
175170
0 commit comments