Skip to content

Commit 128d569

Browse files
Use TableReference instead of String for table identification in BigQueryStorageApiInsertError
Following reviewer feedback, this change updates BigQueryStorageApiInsertError to use TableReference instead of String tableUrn for better consistency with BigQueryInsertError. Changes: - Changed BigQueryStorageApiInsertError to use TableReference field instead of String tableUrn - Updated BigQueryStorageApiInsertErrorCoder to use BigQueryHelpers.toTableSpec() and parseTableSpec() - Added null safety checks in coder to prevent NullPointerException when table is unknown - Updated all calling sites to pass TableReference from TableDestination.getTableReference(): - StorageApiWriteUnshardedRecords.java (3 locations) - StorageApiWritesShardedRecords.java (3 locations + added tableReference variable) - StorageApiConvertMessages.java (1 location) - Added TableReference imports to modified files The null checks in the coder ensure job stability even when table information is unavailable during error handling, preventing pipeline failures in error reporting scenarios.
1 parent 8f36be3 commit 128d569

File tree

5 files changed

+32
-50
lines changed

5 files changed

+32
-50
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableReference;
2021
import com.google.api.services.bigquery.model.TableRow;
2122
import javax.annotation.Nullable;
2223

@@ -25,9 +26,7 @@ public class BigQueryStorageApiInsertError {
2526

2627
private @Nullable String errorMessage;
2728

28-
private @Nullable String tableUrn;
29-
30-
private @Nullable String[] parsedParts;
29+
private @Nullable TableReference table;
3130

3231
public BigQueryStorageApiInsertError(TableRow row) {
3332
this(row, null, null);
@@ -38,10 +37,10 @@ public BigQueryStorageApiInsertError(TableRow row, @Nullable String errorMessage
3837
}
3938

4039
public BigQueryStorageApiInsertError(
41-
TableRow row, @Nullable String errorMessage, @Nullable String tableUrn) {
40+
TableRow row, @Nullable String errorMessage, @Nullable TableReference table) {
4241
this.row = row;
4342
this.errorMessage = errorMessage;
44-
this.tableUrn = tableUrn;
43+
this.table = table;
4544
}
4645

4746
public TableRow getRow() {
@@ -54,37 +53,8 @@ public String getErrorMessage() {
5453
}
5554

5655
@Nullable
57-
public String getTableUrn() {
58-
return tableUrn;
59-
}
60-
61-
@Nullable
62-
public String getProjectId() {
63-
return getParsedPart(1);
64-
}
65-
66-
@Nullable
67-
public String getDatasetId() {
68-
return getParsedPart(3);
69-
}
70-
71-
@Nullable
72-
public String getTableId() {
73-
return getParsedPart(5);
74-
}
75-
76-
@Nullable
77-
private String getParsedPart(int index) {
78-
if (parsedParts == null && tableUrn != null && !tableUrn.isEmpty()) {
79-
String[] parts = tableUrn.split("/");
80-
if (parts.length == 6
81-
&& "projects".equals(parts[0])
82-
&& "datasets".equals(parts[2])
83-
&& "tables".equals(parts[4])) {
84-
parsedParts = parts;
85-
}
86-
}
87-
return parsedParts != null ? parsedParts[index] : null;
56+
public TableReference getTable() {
57+
return table;
8858
}
8959

9060
@Override

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableReference;
2021
import com.google.api.services.bigquery.model.TableRow;
2122
import java.io.IOException;
2223
import java.io.InputStream;
@@ -42,15 +43,18 @@ public void encode(BigQueryStorageApiInsertError value, OutputStream outStream)
4243
throws IOException {
4344
TABLE_ROW_CODER.encode(value.getRow(), outStream);
4445
STRING_CODER.encode(value.getErrorMessage(), outStream);
45-
STRING_CODER.encode(value.getTableUrn(), outStream);
46+
TableReference table = value.getTable();
47+
String tableSpec = table != null ? BigQueryHelpers.toTableSpec(table) : null;
48+
STRING_CODER.encode(tableSpec, outStream);
4649
}
4750

4851
@Override
4952
public BigQueryStorageApiInsertError decode(InputStream inStream)
5053
throws CoderException, IOException {
51-
return new BigQueryStorageApiInsertError(
52-
TABLE_ROW_CODER.decode(inStream),
53-
STRING_CODER.decode(inStream),
54-
STRING_CODER.decode(inStream));
54+
TableRow row = TABLE_ROW_CODER.decode(inStream);
55+
String errorMessage = STRING_CODER.decode(inStream);
56+
String tableSpec = STRING_CODER.decode(inStream);
57+
TableReference table = tableSpec != null ? BigQueryHelpers.parseTableSpec(tableSpec) : null;
58+
return new BigQueryStorageApiInsertError(row, errorMessage, table);
5559
}
5660
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2121

22+
import com.google.api.services.bigquery.model.TableReference;
2223
import com.google.api.services.bigquery.model.TableRow;
2324
import java.io.IOException;
2425
import org.apache.beam.sdk.coders.Coder;
@@ -186,15 +187,15 @@ public void processElement(
186187
badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow");
187188
return;
188189
}
189-
String tableUrn = null;
190+
TableReference tableReference = null;
190191
TableDestination tableDestination = dynamicDestinations.getTable(element.getKey());
191192
if (tableDestination != null) {
192-
tableUrn = tableDestination.getTableUrn(pipelineOptions.as(BigQueryOptions.class));
193+
tableReference = tableDestination.getTableReference();
193194
}
194195
o.get(failedWritesTag)
195196
.output(
196197
new BigQueryStorageApiInsertError(
197-
failsafeTableRow, conversionException.toString(), tableUrn));
198+
failsafeTableRow, conversionException.toString(), tableReference));
198199
} catch (Exception e) {
199200
badRecordRouter.route(
200201
o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ void addMessage(
608608
org.joda.time.Instant timestamp = payload.getTimestamp();
609609
rowsSentToFailedRowsCollection.inc();
610610
failedRowsReceiver.outputWithTimestamp(
611-
new BigQueryStorageApiInsertError(tableRow, e.toString(), tableUrn),
611+
new BigQueryStorageApiInsertError(
612+
tableRow, e.toString(), tableDestination.getTableReference()),
612613
timestamp != null ? timestamp : elementTs);
613614
return;
614615
}
@@ -668,7 +669,9 @@ long flush(
668669
org.joda.time.Instant timestamp = insertTimestamps.get(i);
669670
failedRowsReceiver.outputWithTimestamp(
670671
new BigQueryStorageApiInsertError(
671-
failedRow, "Row payload too large. Maximum size " + maxRequestSize, tableUrn),
672+
failedRow,
673+
"Row payload too large. Maximum size " + maxRequestSize,
674+
tableDestination.getTableReference()),
672675
timestamp);
673676
}
674677
int numRowsFailed = inserts.getSerializedRowsCount();
@@ -755,7 +758,7 @@ long flush(
755758
new BigQueryStorageApiInsertError(
756759
failedRow,
757760
error.getRowIndexToErrorMessage().get(failedIndex),
758-
tableUrn);
761+
tableDestination.getTableReference());
759762
} catch (Exception e) {
760763
LOG.error("Failed to insert row and could not parse the result!", e);
761764
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
24+
import com.google.api.services.bigquery.model.TableReference;
2425
import com.google.api.services.bigquery.model.TableRow;
2526
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
2627
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
@@ -481,6 +482,7 @@ public void process(
481482
});
482483
final String tableId = tableDestination.getTableUrn(bigQueryOptions);
483484
final String shortTableId = tableDestination.getShortTableUrn();
485+
final TableReference tableReference = tableDestination.getTableReference();
484486
final DatasetService datasetService = getDatasetService(pipelineOptions);
485487
final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);
486488

@@ -620,7 +622,7 @@ public void process(
620622
o.get(failedRowsTag)
621623
.outputWithTimestamp(
622624
new BigQueryStorageApiInsertError(
623-
failedRow.getValue(), errorMessage, tableId),
625+
failedRow.getValue(), errorMessage, tableReference),
624626
failedRow.getTimestamp());
625627
rowsSentToFailedRowsCollection.inc();
626628
BigQuerySinkMetrics.appendRowsRowStatusCounter(
@@ -740,7 +742,9 @@ public void process(
740742
o.get(failedRowsTag)
741743
.outputWithTimestamp(
742744
new BigQueryStorageApiInsertError(
743-
failedRow, error.getRowIndexToErrorMessage().get(failedIndex), tableId),
745+
failedRow,
746+
error.getRowIndexToErrorMessage().get(failedIndex),
747+
tableReference),
744748
timestamp);
745749
}
746750
int failedRows = failedRowIndices.size();
@@ -913,7 +917,7 @@ public void process(
913917
new BigQueryStorageApiInsertError(
914918
failedRow,
915919
"Row payload too large. Maximum size " + maxRequestSize,
916-
tableId),
920+
tableReference),
917921
timestamp);
918922
}
919923
int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount();

0 commit comments

Comments
 (0)