diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java index abab169d6932..a741c637a19e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java @@ -20,6 +20,8 @@ import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; +import java.util.Arrays; +import java.util.List; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.schemas.NoSuchSchemaException; @@ -47,6 +49,11 @@ public String identifier() { return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE); } + @Override + public List outputCollectionNames() { + return Arrays.asList("FailedRows", "FailedRowsWithErrors", "errors"); + } + @Override protected SchemaTransform from(BigQueryWriteConfiguration configuration) { return new BigQueryWriteSchemaTransform(configuration); @@ -62,9 +69,10 @@ public static class BigQueryWriteSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)) { + if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED) + && configuration.getErrorHandling() == null) { return input.apply(new BigQueryFileLoadsSchemaTransformProvider().from(configuration)); - } else { // UNBOUNDED + } else { // UNBOUNDED or error handling specified return input.apply( new BigQueryStorageWriteApiSchemaTransformProvider().from(configuration)); }