2626import io .cdap .cdap .api .data .format .StructuredRecord ;
2727import io .cdap .cdap .api .data .schema .Schema ;
2828import io .cdap .cdap .api .data .schema .Schema .Field ;
29+ import io .cdap .cdap .api .exception .ErrorCategory ;
30+ import io .cdap .cdap .api .exception .ErrorType ;
31+ import io .cdap .cdap .api .exception .ErrorUtils ;
2932import io .cdap .cdap .api .plugin .PluginConfig ;
3033import io .cdap .cdap .etl .api .Emitter ;
3134import io .cdap .cdap .etl .api .FailureCollector ;
@@ -176,16 +179,20 @@ private void init() {
176179 break ;
177180
178181 default :
179- throw new IllegalArgumentException ("Format {} specified is not one of the allowed format. Allowed formats are" +
180- "DEFAULT, EXCEL, MYSQL, RFC4180, Pipe Delimited and Tab Delimited or " +
181- "Custom" );
182+ String error = String .format ("Format %s specified is not one of the allowed format. Allowed formats are " +
183+ "DEFAULT, EXCEL, MYSQL, RFC4180, Pipe Delimited and Tab Delimited or Custom" ,
184+ csvFormatString );
185+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
186+ error , error , ErrorType .USER , false , null );
182187 }
183188
184189 try {
185190 outSchema = Schema .parseJson (config .schema );
186191 fields = outSchema .getFields ();
187192 } catch (IOException e ) {
188- throw new IllegalArgumentException ("Format of schema specified is invalid. Please check the format." );
193+ String error = "Format of schema specified is invalid. Please check the format." ;
194+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
195+ error , error , ErrorType .USER , false , null );
189196 }
190197 }
191198
@@ -224,6 +231,13 @@ private StructuredRecord createStructuredRecord(@Nullable CSVRecord record, Stru
224231 } else if (record == null ) {
225232 builder .set (name , null );
226233 } else {
234+ if (i >= record .size ()) {
235+ String error = String .format ("Number of values in the record : %s are not as per output schema. " +
236+ "Expected %d values, but found only %d values" ,
237+ record , fields .size (), record .size ());
238+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
239+ error , error , ErrorType .USER , false , null );
240+ }
227241 String val = record .get (i );
228242 Schema fieldSchema = field .getSchema ();
229243
@@ -235,10 +249,12 @@ private StructuredRecord createStructuredRecord(@Nullable CSVRecord record, Stru
235249 builder .set (field .getName (), "" );
236250 } else if (!isNullable ) {
237251 // otherwise, error out
238- throw new IllegalArgumentException ( String .format (
252+ String error = String .format (
239253 "Field #%d (named '%s') is of non-nullable type '%s', " +
240254 "but was parsed as an empty string for CSV record '%s'" ,
241- i , field .getName (), field .getSchema ().getType (), record ));
255+ i , field .getName (), field .getSchema ().getType (), record );
256+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
257+ error , error , ErrorType .USER , false , null );
242258 }
243259 } else {
244260 builder .convertAndSet (field .getName (), val );
0 commit comments