Skip to content

Commit fa8c5b5

Browse files
authored
Fix data loss from BigQuery write exception during GCSBatchTableWriter run (#153)
* added reporting of SinkRecords that were not written due to exception in GcsToBqWriter * updated license info and cleaned up code * updated license
1 parent 38aeee1 commit fa8c5b5

File tree

11 files changed

+212
-69
lines changed

11 files changed

+212
-69
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ private void writeSinkRecords(Collection<SinkRecord> records) {
242242
table.getBaseTableId(),
243243
config.getString(BigQuerySinkConfig.GCS_BUCKET_NAME_CONFIG),
244244
gcsBlobName,
245-
recordConverter);
245+
recordConverter,
246+
errantRecordHandler);
246247
} else {
247248
TableWriter.Builder simpleTableWriterBuilder =
248249
new TableWriter.Builder(bigQueryWriter, table, recordConverter);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ public void createOrUpdateTable(TableId table, List<SinkRecord> records) {
235235
* @param table The BigQuery table to create.
236236
* @param records The sink records used to determine the schema.
237237
* @return whether the table had to be created; if the table already existed, will return false
238+
* @throws BigQueryException on non-recoverable BigQuery error.
238239
*/
239240
public boolean createTable(TableId table, List<SinkRecord> records) {
240241
synchronized (lock(tableCreateLocks, table)) {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GcsBatchTableWriter.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525

2626
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
2727
import com.google.cloud.bigquery.TableId;
28+
import com.wepay.kafka.connect.bigquery.ErrantRecordHandler;
2829
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
2930
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
3031
import com.wepay.kafka.connect.bigquery.write.row.GcsToBqWriter;
3132
import java.util.Comparator;
33+
import java.util.HashMap;
34+
import java.util.Map;
3235
import java.util.SortedMap;
3336
import java.util.TreeMap;
3437
import org.apache.kafka.connect.errors.ConnectException;
@@ -50,6 +53,7 @@ public class GcsBatchTableWriter implements Runnable {
5053

5154
private final SortedMap<SinkRecord, RowToInsert> rows;
5255
private final GcsToBqWriter writer;
56+
private final ErrantRecordHandler errantRecordHandler;
5357

5458
/**
5559
* @param rows The list of rows that should be written through GCS
@@ -63,22 +67,25 @@ private GcsBatchTableWriter(SortedMap<SinkRecord, RowToInsert> rows,
6367
GcsToBqWriter writer,
6468
TableId tableId,
6569
String bucketName,
66-
String baseBlobName) {
70+
String baseBlobName,
71+
ErrantRecordHandler errantRecordHandler) {
6772
this.tableId = tableId;
6873
this.bucketName = bucketName;
6974
this.blobName = baseBlobName;
70-
7175
this.rows = rows;
7276
this.writer = writer;
77+
this.errantRecordHandler = errantRecordHandler;
7378
}
7479

7580
@Override
7681
public void run() {
7782
try {
7883
writer.writeRows(rows, tableId, bucketName, blobName);
79-
} catch (ConnectException ex) {
84+
} catch (RuntimeException ex) {
85+
errantRecordHandler.reportErrantRecords(rows.keySet(), ex);
8086
throw new ConnectException("Failed to write rows to GCS", ex);
8187
} catch (InterruptedException ex) {
88+
errantRecordHandler.reportErrantRecords(rows.keySet(), ex);
8289
throw new ConnectException("Thread interrupted while batch writing", ex);
8390
}
8491
}
@@ -92,22 +99,25 @@ public static class Builder implements TableWriterBuilder {
9299
private final SortedMap<SinkRecord, RowToInsert> rows;
93100
private final SinkRecordConverter recordConverter;
94101
private final GcsToBqWriter writer;
95-
private String blobName;
102+
private final String blobName;
103+
private final ErrantRecordHandler errantRecordHandler;
96104

97105
/**
98106
* Create a {@link GcsBatchTableWriter.Builder}.
99107
*
100-
* @param writer the {@link GcsToBqWriter} to use.
101-
* @param tableId The bigquery table to be written to.
102-
* @param gcsBucketName The GCS bucket to write to.
103-
* @param gcsBlobName The name of the GCS blob to write.
104-
* @param recordConverter the {@link RecordConverter} to use.
108+
* @param writer the {@link GcsToBqWriter} to use.
109+
* @param tableId The bigquery table to be written to.
110+
* @param gcsBucketName The GCS bucket to write to.
111+
* @param gcsBlobName The name of the GCS blob to write.
112+
* @param recordConverter the {@link RecordConverter} to use.
113+
* @param errantRecordHandler the handler for records that can not be written.
105114
*/
106115
public Builder(GcsToBqWriter writer,
107116
TableId tableId,
108117
String gcsBucketName,
109118
String gcsBlobName,
110-
SinkRecordConverter recordConverter) {
119+
SinkRecordConverter recordConverter,
120+
ErrantRecordHandler errantRecordHandler) {
111121

112122
this.bucketName = gcsBucketName;
113123
this.blobName = gcsBlobName;
@@ -117,6 +127,7 @@ public Builder(GcsToBqWriter writer,
117127
.thenComparing(SinkRecord::kafkaOffset));
118128
this.recordConverter = recordConverter;
119129
this.writer = writer;
130+
this.errantRecordHandler = errantRecordHandler;
120131
}
121132

122133
@Override
@@ -126,7 +137,7 @@ public void addRow(SinkRecord record, TableId table) {
126137

127138
@Override
128139
public GcsBatchTableWriter build() {
129-
return new GcsBatchTableWriter(rows, writer, tableId, bucketName, blobName);
140+
return new GcsBatchTableWriter(rows, writer, tableId, bucketName, blobName, errantRecordHandler);
130141
}
131142
}
132143
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import com.google.cloud.BaseServiceException;
2727
import com.google.cloud.bigquery.BigQuery;
28+
import com.google.cloud.bigquery.BigQueryException;
2829
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
2930
import com.google.cloud.bigquery.Table;
3031
import com.google.cloud.bigquery.TableId;
@@ -123,6 +124,7 @@ private static Map<String, String> getMetadata(TableId tableId) {
123124
* @param bucketName the GCS bucket to write to.
124125
* @param blobName the name of the GCS blob to write.
125126
* @throws InterruptedException if interrupted.
127+
* @throws BigQueryException on BigQuery error.
126128
*/
127129
public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
128130
TableId tableId,

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import static org.junit.jupiter.api.Assertions.assertThrows;
3030
import static org.junit.jupiter.api.Assertions.assertTrue;
3131
import static org.mockito.ArgumentMatchers.any;
32-
import static org.mockito.ArgumentMatchers.anyObject;
3332
import static org.mockito.Mockito.mock;
3433
import static org.mockito.Mockito.times;
3534
import static org.mockito.Mockito.verify;
@@ -44,6 +43,7 @@
4443
import com.google.cloud.bigquery.LegacySQLTypeName;
4544
import com.google.cloud.bigquery.QueryJobConfiguration;
4645
import com.google.cloud.bigquery.Table;
46+
import com.google.cloud.bigquery.TableId;
4747
import com.google.cloud.storage.BlobInfo;
4848
import com.google.cloud.storage.Storage;
4949
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
@@ -58,7 +58,6 @@
5858
import com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBase;
5959
import com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiDefaultStream;
6060
import java.net.SocketTimeoutException;
61-
import java.time.Duration;
6261
import java.util.Arrays;
6362
import java.util.Collections;
6463
import java.util.Map;
@@ -258,7 +257,7 @@ public void testSimplePut() {
258257

259258
BigQuery bigQuery = mock(BigQuery.class);
260259
Table mockTable = mock(Table.class);
261-
when(bigQuery.getTable(any())).thenReturn(mockTable);
260+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
262261

263262
Storage storage = mock(Storage.class);
264263

@@ -308,7 +307,7 @@ public void testPutForGCSToBQ() {
308307
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
309308
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
310309

311-
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
310+
when(bigQuery.insertAll(any())).thenReturn(insertAllResponse);
312311
when(insertAllResponse.hasErrors()).thenReturn(false);
313312

314313
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
@@ -332,7 +331,7 @@ public void testPutForGCSToBQ() {
332331
ArgumentCaptor<BlobInfo> blobInfo = ArgumentCaptor.forClass(BlobInfo.class);
333332
testTask.flush(Collections.emptyMap());
334333

335-
verify(storage, times(repeats)).create(blobInfo.capture(), (byte[]) anyObject());
334+
verify(storage, times(repeats)).create(blobInfo.capture(), any(byte[].class));
336335
assertEquals(repeats, blobInfo.getAllValues().stream().map(info -> info.getBlobId().getName()).collect(Collectors.toSet()).size());
337336
}
338337

@@ -353,7 +352,7 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() {
353352
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
354353
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
355354

356-
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
355+
when(bigQuery.insertAll(any())).thenReturn(insertAllResponse);
357356
when(insertAllResponse.hasErrors()).thenReturn(false);
358357

359358
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
@@ -456,7 +455,7 @@ public void testPutWithUpsertDelete() throws Exception {
456455
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
457456

458457
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
459-
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
458+
when(bigQuery.insertAll(any())).thenReturn(insertAllResponse);
460459
when(insertAllResponse.hasErrors()).thenReturn(false);
461460

462461
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
@@ -533,12 +532,12 @@ public void testSimplePutException() throws InterruptedException {
533532

534533
BigQuery bigQuery = mock(BigQuery.class);
535534
Table mockTable = mock(Table.class);
536-
when(bigQuery.getTable(any())).thenReturn(mockTable);
535+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
537536

538537
Storage storage = mock(Storage.class);
539538
String error = "Cannot add required fields to an existing schema.";
540539
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
541-
when(bigQuery.insertAll(any()))
540+
when(bigQuery.insertAll(any(InsertAllRequest.class)))
542541
.thenThrow(
543542
new BigQueryException(400, error, new BigQueryError("invalid", "global", error)));
544543

@@ -603,7 +602,7 @@ public void testFlushAfterStop() {
603602
Storage storage = mock(Storage.class);
604603

605604
BigQuery bigQuery = mock(BigQuery.class);
606-
when(bigQuery.insertAll(any()))
605+
when(bigQuery.insertAll(any(InsertAllRequest.class)))
607606
.thenThrow(
608607
new BigQueryException(400, "Oops", new BigQueryError("invalid", "global", "oops")));
609608

@@ -655,7 +654,7 @@ public void testBigQueryReadTimeout() {
655654
initialize(properties);
656655

657656
BigQuery bigQuery = mock(BigQuery.class);
658-
when(bigQuery.getTable(any())).thenThrow(new BigQueryException(new SocketTimeoutException("mock timeout")));
657+
when(bigQuery.getTable(any(TableId.class))).thenThrow(new BigQueryException(new SocketTimeoutException("mock timeout")));
659658

660659
Storage storage = mock(Storage.class);
661660
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
@@ -694,12 +693,12 @@ public void testBigQuery5XXRetry() {
694693

695694
BigQuery bigQuery = mock(BigQuery.class);
696695
Table mockTable = mock(Table.class);
697-
when(bigQuery.getTable(any())).thenReturn(mockTable);
696+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
698697

699698
Storage storage = mock(Storage.class);
700699

701700
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
702-
when(bigQuery.insertAll(anyObject()))
701+
when(bigQuery.insertAll(any(InsertAllRequest.class)))
703702
.thenThrow(new BigQueryException(500, "mock 500"))
704703
.thenThrow(new BigQueryException(502, "mock 502"))
705704
.thenThrow(new BigQueryException(503, "mock 503"))
@@ -725,7 +724,7 @@ public void testBigQuery5XXRetry() {
725724
testTask.put(Collections.singletonList(spoofSinkRecord(topic)));
726725
testTask.flush(Collections.emptyMap());
727726

728-
verify(bigQuery, times(4)).insertAll(anyObject());
727+
verify(bigQuery, times(4)).insertAll(any(InsertAllRequest.class));
729728
}
730729

731730
@Test
@@ -742,14 +741,14 @@ public void testBigQuery403Retry() {
742741

743742
BigQuery bigQuery = mock(BigQuery.class);
744743
Table mockTable = mock(Table.class);
745-
when(bigQuery.getTable(any())).thenReturn(mockTable);
744+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
746745

747746
Storage storage = mock(Storage.class);
748747

749748
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
750749
BigQueryError quotaExceededError = new BigQueryError("quotaExceeded", null, null);
751750
BigQueryError rateLimitExceededError = new BigQueryError("rateLimitExceeded", null, null);
752-
when(bigQuery.insertAll(anyObject()))
751+
when(bigQuery.insertAll(any(InsertAllRequest.class)))
753752
.thenThrow(new BigQueryException(403, "mock quota exceeded", quotaExceededError))
754753
.thenThrow(new BigQueryException(403, "mock rate limit exceeded", rateLimitExceededError))
755754
.thenReturn(insertAllResponse);
@@ -774,7 +773,7 @@ public void testBigQuery403Retry() {
774773
testTask.put(Collections.singletonList(spoofSinkRecord(topic)));
775774
testTask.flush(Collections.emptyMap());
776775

777-
verify(bigQuery, times(3)).insertAll(anyObject());
776+
verify(bigQuery, times(3)).insertAll(any(InsertAllRequest.class));
778777
}
779778

780779
@Test
@@ -791,13 +790,13 @@ public void testBigQueryRetryExceeded() {
791790

792791
BigQuery bigQuery = mock(BigQuery.class);
793792
Table mockTable = mock(Table.class);
794-
when(bigQuery.getTable(any())).thenReturn(mockTable);
793+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
795794

796795
Storage storage = mock(Storage.class);
797796

798797
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
799798
BigQueryError quotaExceededError = new BigQueryError("quotaExceeded", null, null);
800-
when(bigQuery.insertAll(anyObject()))
799+
when(bigQuery.insertAll(any(InsertAllRequest.class)))
801800
.thenThrow(new BigQueryException(403, "mock quota exceeded", quotaExceededError));
802801
when(insertAllResponse.hasErrors()).thenReturn(false);
803802

@@ -837,7 +836,7 @@ public void testInterruptedException() {
837836

838837
BigQuery bigQuery = mock(BigQuery.class);
839838
Table mockTable = mock(Table.class);
840-
when(bigQuery.getTable(any())).thenReturn(mockTable);
839+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
841840

842841
Storage storage = mock(Storage.class);
843842
InsertAllResponse fakeResponse = mock(InsertAllResponse.class);
@@ -891,12 +890,12 @@ public void testStop() {
891890

892891
BigQuery bigQuery = mock(BigQuery.class);
893892
Table mockTable = mock(Table.class);
894-
when(bigQuery.getTable(any())).thenReturn(mockTable);
893+
when(bigQuery.getTable(any(TableId.class))).thenReturn(mockTable);
895894

896895
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
897896
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
898897

899-
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
898+
when(bigQuery.insertAll(any(InsertAllRequest.class))).thenReturn(insertAllResponse);
900899
when(insertAllResponse.hasErrors()).thenReturn(false);
901900

902901
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQueryStorageApiSinkTaskTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
import static com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiWriter.DEFAULT;
2727
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2828
import static org.junit.jupiter.api.Assertions.assertThrows;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.ArgumentMatchers.anyList;
2931
import static org.mockito.ArgumentMatchers.eq;
30-
import static org.mockito.Matchers.any;
3132
import static org.mockito.Mockito.CALLS_REAL_METHODS;
3233
import static org.mockito.Mockito.doNothing;
3334
import static org.mockito.Mockito.doThrow;
@@ -89,7 +90,7 @@ public void setUp() {
8990
spoofedRecordOffset.set(0);
9091

9192
doNothing().when(mockedStorageWriteApiDefaultStream)
92-
.initializeAndWriteRecords(any(PartitionedTableId.class), any(), eq(DEFAULT));
93+
.initializeAndWriteRecords(any(PartitionedTableId.class), anyList(), eq(DEFAULT));
9394
doNothing().when(mockedStorageWriteApiDefaultStream).shutdown();
9495

9596
testTask.initialize(sinkTaskContext);
@@ -102,15 +103,15 @@ public void testPut() {
102103
testTask.flush(Collections.emptyMap());
103104

104105
verify(mockedStorageWriteApiDefaultStream, times(1))
105-
.initializeAndWriteRecords(any(PartitionedTableId.class), any(), eq(DEFAULT));
106+
.initializeAndWriteRecords(any(PartitionedTableId.class), anyList(), eq(DEFAULT));
106107
}
107108

108109
@Test
109110
public void testSimplePutException() {
110111
BigQueryStorageWriteApiConnectException exception = new BigQueryStorageWriteApiConnectException("error 12345");
111112

112113
doThrow(exception).when(mockedStorageWriteApiDefaultStream)
113-
.initializeAndWriteRecords(any(PartitionedTableId.class), any(), eq(DEFAULT));
114+
.initializeAndWriteRecords(any(PartitionedTableId.class), anyList(), eq(DEFAULT));
114115

115116
testTask.put(Collections.singletonList(spoofSinkRecord()));
116117
BigQueryConnectException e = assertThrows(

0 commit comments

Comments
 (0)