Skip to content

Commit a3af5d5

Browse files
authored
feat(BigQueryWriteSchemaTransformProvider): add output collection names and error handling check (#36937)
Add support for failed rows output collections and modify bounded check to consider error handling configuration
1 parent 0676463 commit a3af5d5

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
2121

2222
import com.google.auto.service.AutoService;
23+
import java.util.Arrays;
24+
import java.util.List;
2325
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2426
import org.apache.beam.sdk.annotations.Internal;
2527
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -47,6 +49,11 @@ public String identifier() {
4749
return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
4850
}
4951

52+
@Override
53+
public List<String> outputCollectionNames() {
54+
return Arrays.asList("FailedRows", "FailedRowsWithErrors", "errors");
55+
}
56+
5057
@Override
5158
protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
5259
return new BigQueryWriteSchemaTransform(configuration);
@@ -62,9 +69,10 @@ public static class BigQueryWriteSchemaTransform extends SchemaTransform {
6269

6370
@Override
6471
public PCollectionRowTuple expand(PCollectionRowTuple input) {
65-
if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)) {
72+
if (input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)
73+
&& configuration.getErrorHandling() == null) {
6674
return input.apply(new BigQueryFileLoadsSchemaTransformProvider().from(configuration));
67-
} else { // UNBOUNDED
75+
} else { // UNBOUNDED or error handling specified
6876
return input.apply(
6977
new BigQueryStorageWriteApiSchemaTransformProvider().from(configuration));
7078
}

0 commit comments

Comments
 (0)