44 */
55package io .github .pastorgl .datacooker .scripting ;
66
7- import com .fasterxml .jackson .databind .ObjectMapper ;
8- import com .fasterxml .jackson .databind .SerializationFeature ;
97import io .github .pastorgl .datacooker .Constants ;
108import io .github .pastorgl .datacooker .Options ;
119import io .github .pastorgl .datacooker .config .Configuration ;
1816import org .antlr .v4 .runtime .tree .TerminalNode ;
1917import org .apache .commons .collections4 .map .ListOrderedMap ;
2018import org .apache .spark .api .java .JavaPairRDD ;
19+ import scala .Function1 ;
2120
2221import java .util .*;
2322import java .util .stream .Collectors ;
@@ -138,13 +137,15 @@ public void interpret(DataContext dataContext) {
138137
139138 if (verbose ) {
140139 System .out .println ("-------------- OPTIONS ---------------" );
141- System .out .println ("Before: " + options );
140+ System .out .println ("Before: " + options + " \n " );
142141 }
143142
143+ int o = 0 ;
144144 for (TDL4 .StatementContext stmt : scriptContext .statement ()) {
145145 if (stmt .options_stmt () != null ) {
146146 if (verbose ) {
147- System .out .println ("Parsed as: " + stmtTokens (stmt ) + "\n " );
147+ o ++;
148+ System .out .println (String .format ("Change %05d parsed as: " , o ) + stmtTokens (stmt ) + "\n " );
148149 }
149150
150151 Map <String , Object > opts = resolveParams (stmt .options_stmt ().params_expr ());
@@ -153,7 +154,11 @@ public void interpret(DataContext dataContext) {
153154 }
154155
155156 if (verbose ) {
156- System .out .println ("After: " + options );
157+ if (o > 0 ) {
158+ System .out .println ("After " + o + " changes: " + options + "\n " );
159+ } else {
160+ System .out .println ("Unchanged\n " );
161+ }
157162 }
158163
159164 this .dataContext = dataContext ;
@@ -173,7 +178,7 @@ private String stmtTokens(ParserRuleContext stmt) {
173178
174179 private void statement (TDL4 .StatementContext stmt ) {
175180 if (verbose ) {
176- System .out .println ("---------- STATEMENT #" + String . format ( " %05d" , ++ stCnt ) + " ----------" );
181+ System .out .printf ("---------- STATEMENT #%05d ----------%n" , ++ stCnt );
177182 System .out .println ("Parsed as: " + stmtTokens (stmt ) + "\n " );
178183 }
179184
@@ -206,6 +211,44 @@ private void statement(TDL4.StatementContext stmt) {
206211 }
207212 }
208213
214+ private String defParams (final Map <String , DefinitionMeta > meta , final Map <String , Object > params ) {
215+ if (meta == null ) {
216+ return "NONE" ;
217+ }
218+
219+ Function1 <Object , String > pretty = (k ) -> {
220+ Object a = params .get (k );
221+ return (a == null ) ? "NULL" : (a .getClass ().isArray () ? Arrays .toString ((Object []) a ) : String .valueOf (a ));
222+ };
223+
224+ List <String > sl = new LinkedList <>();
225+ sl .add (params .size () + " set" );
226+ for (Map .Entry <String , DefinitionMeta > m : meta .entrySet ()) {
227+ DefinitionMeta mm = m .getValue ();
228+ String key = m .getKey ();
229+
230+ if (mm .dynamic ) {
231+ int [] ii = {0 };
232+ params .keySet ().stream ().filter (k -> k .startsWith (key )).forEach (k -> {
233+ sl .add (key + " " + k .substring (key .length ()) + ": " + pretty .apply (k ));
234+ ii [0 ]++;
235+ });
236+ if (ii [0 ] == 0 ) {
237+ sl .add (key + " not set" );
238+ }
239+ } else if (mm .optional ) {
240+ if (params .containsKey (key )) {
241+ sl .add (key + " set to: " + pretty .apply (key ));
242+ } else {
243+ sl .add (key + " defaults to: " + mm .defaults );
244+ }
245+ } else {
246+ sl .add (key + " (required): " + pretty .apply (key ));
247+ }
248+ }
249+ return String .join ("\n \t " , sl );
250+ }
251+
209252 private void create (TDL4 .Create_stmtContext ctx ) {
210253 String inputName = resolveName (ctx .ds_name ().L_IDENTIFIER ());
211254
@@ -244,11 +287,7 @@ private void create(TDL4.Create_stmtContext ctx) {
244287 Map <String , Object > params = resolveParams (funcExpr .params_expr ());
245288
246289 if (verbose ) {
247- try {
248- System .out .println ("CREATE parameters: " + new ObjectMapper ().enable (SerializationFeature .INDENT_OUTPUT ).writeValueAsString (params ) + "\n " );
249- } catch (Exception e ) {
250- throw new RuntimeException (e );
251- }
290+ System .out .println ("CREATE parameters: " + defParams (Adapters .INPUTS .get (inVerb ).meta .definitions , params ) + "\n " );
252291 }
253292
254293 Map <String , StreamInfo > si = dataContext .createDataStreams (inVerb , inputName , path , params , partCount , partitioning );
@@ -384,13 +423,13 @@ private void transform(TDL4.Transform_stmtContext ctx) {
384423 if (verbose ) {
385424 System .out .println ("TRANSFORMing DS " + dsName + ": " + dataContext .streamInfo (dsName ).describe (ut ));
386425 try {
387- System .out .println ("TRANSFORM parameters: " + new ObjectMapper (). enable ( SerializationFeature . INDENT_OUTPUT ). writeValueAsString ( params ) + "\n " );
426+ System .out .println ("TRANSFORM parameters: " + defParams ( meta . definitions , params ) + "\n " );
388427 } catch (Exception e ) {
389428 throw new RuntimeException (e );
390429 }
391430 }
392- StreamInfo si = dataContext .alterDataStream (dsName , converter , columns , keyExpression , Transforms . TRANSFORMS . get ( tfVerb ). meta .keyAfter (), partCount ,
393- new Configuration (Transforms . TRANSFORMS . get ( tfVerb ). meta .definitions , "Transform '" + tfVerb + "'" , params ));
431+ StreamInfo si = dataContext .alterDataStream (dsName , converter , columns , keyExpression , meta .keyAfter (), partCount ,
432+ new Configuration (meta .definitions , "Transform '" + tfVerb + "'" , params ));
394433 if (verbose ) {
395434 System .out .println ("TRANSFORMed DS " + dsName + ": " + si .describe (ut ));
396435 }
@@ -422,7 +461,8 @@ private void copy(TDL4.Copy_stmtContext ctx) {
422461 throw new InvalidConfigurationException ("Storage output adapter \" " + outVerb + "\" isn't present" );
423462 }
424463
425- List <StreamType > types = Arrays .asList (Adapters .OUTPUTS .get (outVerb ).meta .type );
464+ OutputAdapterMeta meta = Adapters .OUTPUTS .get (outVerb ).meta ;
465+ List <StreamType > types = Arrays .asList (meta .type );
426466 for (String dsName : dataStreams ) {
427467 StreamType streamType = dataContext .get (dsName ).streamType ;
428468 if (!types .contains (streamType )) {
@@ -438,7 +478,7 @@ private void copy(TDL4.Copy_stmtContext ctx) {
438478 if (verbose ) {
439479 System .out .println ("COPYing DS " + dataStream + ": " + dataContext .streamInfo (dataStream ).describe (ut ));
440480 try {
441- System .out .println ("COPY parameters: " + new ObjectMapper (). enable ( SerializationFeature . INDENT_OUTPUT ). writeValueAsString ( params ) + "\n " );
481+ System .out .println ("COPY parameters: " + defParams ( meta . definitions , params ) + "\n " );
442482 } catch (Exception e ) {
443483 throw new RuntimeException (e );
444484 }
@@ -1089,7 +1129,7 @@ private void call(TDL4.Call_stmtContext ctx) {
10891129
10901130 if (verbose ) {
10911131 try {
1092- System .out .println ("CALL parameters: " + new ObjectMapper (). enable ( SerializationFeature . INDENT_OUTPUT ). writeValueAsString ( params ) + "\n " );
1132+ System .out .println ("CALL parameters: " + defParams ( meta . definitions , params ) + "\n " );
10931133 } catch (Exception e ) {
10941134 throw new RuntimeException (e );
10951135 }
0 commit comments