@@ -89,12 +89,13 @@ public BigQuerySink(BigQuerySinkConfig config) {
8989
9090 @ Override
9191 public void configurePipeline (PipelineConfigurer pipelineConfigurer ) {
92+ config .validate (pipelineConfigurer .getStageConfigurer ().getInputSchema ());
9293 super .configurePipeline (pipelineConfigurer );
93- config .validate ();
9494 }
9595
9696 @ Override
9797 public void prepareRun (BatchSinkContext context ) throws Exception {
98+ config .validate (context .getInputSchema ());
9899 BigQuery bigquery = BigQueryUtils .getBigQuery (config .getServiceAccountFilePath (), config .getProject ());
99100 // create dataset if it does not exist
100101 if (bigquery .getDataset (config .getDataset ()) == null ) {
@@ -105,6 +106,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
105106 }
106107 }
107108
109+ // schema validation against bigquery table schema
108110 validateSchema ();
109111
110112 uuid = UUID .randomUUID ();
@@ -155,10 +157,12 @@ public void initialize(BatchRuntimeContext context) throws Exception {
155157
156158 @ Override
157159 public void transform (StructuredRecord input , Emitter <KeyValue <JsonObject , NullWritable >> emitter ) throws Exception {
158- List <Schema .Field > fields = config .getSchema ().getFields ();
159160 JsonObject object = new JsonObject ();
160- for (Schema .Field field : fields ) {
161- decodeSimpleTypes (object , field .getName (), input );
161+ for (Schema .Field recordField : input .getSchema ().getFields ()) {
162+ // From all the fields in input record, decode only those fields that are present in output schema
163+ if (schema .getField (recordField .getName ()) != null ) {
164+ decodeSimpleTypes (object , recordField .getName (), input );
165+ }
162166 }
163167 emitter .emit (new KeyValue <>(object , NullWritable .get ()));
164168 }
@@ -339,20 +343,8 @@ private void validateSchema() throws IOException {
339343
340344 // Match output schema field type with bigquery column type
341345 for (Schema .Field field : config .getSchema ().getFields ()) {
342- validateSimpleTypes (field );
343346 BigQueryUtils .validateFieldSchemaMatches (bqFields .get (field .getName ()),
344347 field , config .getDataset (), config .getTable ());
345348 }
346349 }
347-
348- private void validateSimpleTypes (Schema .Field field ) {
349- String name = field .getName ();
350- Schema fieldSchema = BigQueryUtils .getNonNullableSchema (field .getSchema ());
351- Schema .Type type = fieldSchema .getType ();
352-
353- // Complex types like arrays, maps and unions are not supported in BigQuery plugins.
354- if (!type .isSimpleType ()) {
355- throw new IllegalArgumentException (String .format ("Field '%s' is of unsupported type '%s'." , name , type ));
356- }
357- }
358350}
0 commit comments