diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java index 3f9c6068e0a2..da8961ca3f48 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import javax.annotation.Nullable; @@ -25,13 +26,21 @@ public class BigQueryStorageApiInsertError { private @Nullable String errorMessage; + private @Nullable TableReference table; + public BigQueryStorageApiInsertError(TableRow row) { - this.row = row; + this(row, null, null); } public BigQueryStorageApiInsertError(TableRow row, @Nullable String errorMessage) { + this(row, errorMessage, null); + } + + public BigQueryStorageApiInsertError( + TableRow row, @Nullable String errorMessage, @Nullable TableReference table) { this.row = row; this.errorMessage = errorMessage; + this.table = table; } public TableRow getRow() { @@ -43,6 +52,11 @@ public String getErrorMessage() { return errorMessage; } + @Nullable + public TableReference getTable() { + return table; + } + @Override public String toString() { return "BigQueryStorageApiInsertError{" @@ -51,6 +65,8 @@ public String toString() { + ", errorMessage='" + errorMessage + '\'' + + ", table=" + + table + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java index f289ef14290f..412a07bd2fd8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import java.io.InputStream; @@ -42,12 +43,18 @@ public void encode(BigQueryStorageApiInsertError value, OutputStream outStream) throws IOException { TABLE_ROW_CODER.encode(value.getRow(), outStream); STRING_CODER.encode(value.getErrorMessage(), outStream); + TableReference table = value.getTable(); + String tableSpec = table != null ? BigQueryHelpers.toTableSpec(table) : null; + STRING_CODER.encode(tableSpec, outStream); } @Override public BigQueryStorageApiInsertError decode(InputStream inStream) throws CoderException, IOException { - return new BigQueryStorageApiInsertError( - TABLE_ROW_CODER.decode(inStream), STRING_CODER.decode(inStream)); + TableRow row = TABLE_ROW_CODER.decode(inStream); + String errorMessage = STRING_CODER.decode(inStream); + String tableSpec = STRING_CODER.decode(inStream); + TableReference table = tableSpec != null ? BigQueryHelpers.parseTableSpec(tableSpec) : null; + return new BigQueryStorageApiInsertError(row, errorMessage, table); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java index 0c6f82b9df81..e62429cf0f30 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; @@ -186,10 +187,15 @@ public void processElement( badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow"); return; } + TableReference tableReference = null; + TableDestination tableDestination = dynamicDestinations.getTable(element.getKey()); + if (tableDestination != null) { + tableReference = tableDestination.getTableReference(); + } o.get(failedWritesTag) .output( new BigQueryStorageApiInsertError( - failsafeTableRow, conversionException.toString())); + failsafeTableRow, conversionException.toString(), tableReference)); } catch (Exception e) { badRecordRouter.route( o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f6d10b47ccf2..41bf06d7af23 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -608,7 +608,8 @@ void addMessage( org.joda.time.Instant timestamp = payload.getTimestamp(); rowsSentToFailedRowsCollection.inc(); failedRowsReceiver.outputWithTimestamp( - new BigQueryStorageApiInsertError(tableRow, e.toString()), + new BigQueryStorageApiInsertError( + tableRow, e.toString(), tableDestination.getTableReference()), timestamp != null ? timestamp : elementTs); return; } @@ -668,7 +669,9 @@ long flush( org.joda.time.Instant timestamp = insertTimestamps.get(i); failedRowsReceiver.outputWithTimestamp( new BigQueryStorageApiInsertError( - failedRow, "Row payload too large. Maximum size " + maxRequestSize), + failedRow, + "Row payload too large. Maximum size " + maxRequestSize, + tableDestination.getTableReference()), timestamp); } int numRowsFailed = inserts.getSerializedRowsCount(); @@ -753,7 +756,9 @@ long flush( } element = new BigQueryStorageApiInsertError( - failedRow, error.getRowIndexToErrorMessage().get(failedIndex)); + failedRow, + error.getRowIndexToErrorMessage().get(failedIndex), + tableDestination.getTableReference()); } catch (Exception e) { LOG.error("Failed to insert row and could not parse the result!", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 0ec88897e257..03a5924cacb3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -21,6 +21,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; @@ -481,6 +482,7 @@ public void process( }); final String tableId = tableDestination.getTableUrn(bigQueryOptions); final String shortTableId = tableDestination.getShortTableUrn(); + final TableReference tableReference = tableDestination.getTableReference(); final DatasetService datasetService = getDatasetService(pipelineOptions); final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions); @@ -619,7 +621,8 @@ public void process( (failedRow, errorMessage) -> { o.get(failedRowsTag) .outputWithTimestamp( - new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage), + new BigQueryStorageApiInsertError( + failedRow.getValue(), errorMessage, tableReference), failedRow.getTimestamp()); rowsSentToFailedRowsCollection.inc(); BigQuerySinkMetrics.appendRowsRowStatusCounter( @@ -739,7 +742,9 @@ public void process( o.get(failedRowsTag) .outputWithTimestamp( new BigQueryStorageApiInsertError( - failedRow, error.getRowIndexToErrorMessage().get(failedIndex)), + failedRow, + error.getRowIndexToErrorMessage().get(failedIndex), + tableReference), timestamp); } int failedRows = failedRowIndices.size(); @@ -910,7 +915,9 @@ public void process( o.get(failedRowsTag) .outputWithTimestamp( new BigQueryStorageApiInsertError( - failedRow, "Row payload too large. Maximum size " + maxRequestSize), + failedRow, + "Row payload too large. Maximum size " + maxRequestSize, + tableReference), timestamp); } int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoderTest.java new file mode 100644 index 000000000000..766016058d1a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoderTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test case for {@link BigQueryStorageApiInsertErrorCoder}. */ +@RunWith(JUnit4.class) +public class BigQueryStorageApiInsertErrorCoderTest { + + private static final Coder TEST_CODER = + BigQueryStorageApiInsertErrorCoder.of(); + + @Test + public void testDecodeEncodeEqual() throws Exception { + TableRow row = new TableRow().set("field1", "value1").set("field2", 123); + BigQueryStorageApiInsertError value = + new BigQueryStorageApiInsertError( + row, + "An error message", + new TableReference() + .setProjectId("dummy-project-id") + .setDatasetId("dummy-dataset-id") + .setTableId("dummy-table-id")); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + TEST_CODER.encode(value, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream); + + assertEquals(value.getRow(), decoded.getRow()); + assertEquals(value.getErrorMessage(), decoded.getErrorMessage()); + assertEquals("dummy-project-id", decoded.getTable().getProjectId()); + assertEquals("dummy-dataset-id", decoded.getTable().getDatasetId()); + assertEquals("dummy-table-id", decoded.getTable().getTableId()); + } + + @Test + public void testDecodeEncodeWithNullTable() throws Exception { + TableRow row = new TableRow().set("field1", "value1"); + BigQueryStorageApiInsertError value = + new BigQueryStorageApiInsertError(row, "An error message", null); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + TEST_CODER.encode(value, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream); + + assertEquals(value.getRow(), decoded.getRow()); + assertEquals(value.getErrorMessage(), decoded.getErrorMessage()); + assertNull(decoded.getTable()); + } + + @Test + public void testDecodeEncodeWithNullErrorMessage() throws Exception { + TableRow row = new TableRow().set("field1", "value1"); + BigQueryStorageApiInsertError value = + new BigQueryStorageApiInsertError( + row, + null, + new TableReference() + .setProjectId("dummy-project-id") + .setDatasetId("dummy-dataset-id") + .setTableId("dummy-table-id")); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + TEST_CODER.encode(value, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream); + + assertEquals(value.getRow(), decoded.getRow()); + assertNull(decoded.getErrorMessage()); + assertEquals("dummy-project-id", decoded.getTable().getProjectId()); + assertEquals("dummy-dataset-id", decoded.getTable().getDatasetId()); + assertEquals("dummy-table-id", decoded.getTable().getTableId()); + } + + @Test + public void testDecodeEncodeWithAllNullableFieldsNull() throws Exception { + TableRow row = new TableRow().set("field1", "value1"); + BigQueryStorageApiInsertError value = new BigQueryStorageApiInsertError(row, null, null); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + TEST_CODER.encode(value, outStream); + + ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray()); + BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream); + + assertEquals(value.getRow(), decoded.getRow()); + assertNull(decoded.getErrorMessage()); + assertNull(decoded.getTable()); + } +}