Skip to content

Commit 7dfff70

Browse files
Merge pull request #164 from Claudenw/GH-163_bqWriter_schemaManager_failure
Fix GscBqWriter writing incorrect records to DLQ
2 parents dc64eeb + 648d039 commit 7dfff70

File tree

2 files changed

+230
-31
lines changed

2 files changed

+230
-31
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,33 +144,35 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
144144
Table table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout);
145145
boolean lookupSuccess = table != null;
146146

147-
if (autoCreateTables && !lookupSuccess) {
148-
logger.info("Table {} was not found. Creating the table automatically.", tableId);
149-
Boolean created =
150-
executeWithRetry(
151-
() -> schemaManager.createTable(tableId, sinkRecords), timeout);
152-
if (created == null || !created) {
153-
throw new BigQueryConnectException("Failed to create table " + tableId);
147+
if (!lookupSuccess) {
148+
if (autoCreateTables) {
149+
logger.info("Table {} was not found. Creating the table automatically.", tableId);
150+
Boolean created =
151+
executeWithRetry(
152+
() -> schemaManager.createTable(tableId, sinkRecords), timeout);
153+
if (created == null) {
154+
throw new BigQueryConnectException("Failed to create table " + tableId);
155+
}
156+
157+
} else {
158+
time.sleep(retryWaitMs);
154159
}
155160
table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout);
156161
lookupSuccess = table != null;
157-
}
158-
159-
if (!lookupSuccess) {
160-
throw new BigQueryConnectException("Failed to lookup table " + tableId);
162+
if (!lookupSuccess) {
163+
throw new BigQueryConnectException("Failed to lookup table " + tableId);
164+
}
161165
}
162166

163167
if (attemptSchemaUpdate && schemaManager != null && !sinkRecords.isEmpty()) {
164-
Boolean schemaUpdated =
165-
executeWithRetry(
166-
() -> {
167-
schemaManager.updateSchema(tableId, sinkRecords);
168-
return Boolean.TRUE;
169-
},
170-
timeout
171-
);
168+
Boolean schemaUpdated = executeWithRetry(() -> {
169+
schemaManager.updateSchema(tableId, sinkRecords);
170+
return Boolean.TRUE;
171+
},
172+
timeout
173+
);
172174
if (schemaUpdated == null) {
173-
throw new ConnectException(
175+
throw new BigQueryConnectException(
174176
String.format("Failed to update schema for table %s within %d re-attempts.", tableId, retries)
175177
);
176178
}
@@ -199,7 +201,7 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
199201

200202
// If executeWithRetry timed out (budget exhausted) it returns null → fail like before
201203
if (uploaded == null) {
202-
throw new ConnectException(
204+
throw new BigQueryConnectException(
203205
String.format("Failed to load %d rows into GCS within %d re-attempts.", rows.size(), retries)
204206
);
205207
}
@@ -253,6 +255,7 @@ private String toJson(Collection<RowToInsert> rows) {
253255
* @param func the operation to execute
254256
* @param timeout maximum time to keep retrying (sleep is clamped by remaining time)
255257
* @return result of the function, or {@code null} if timeout expires before a successful call
258+
* @throws BigQueryConnectException if thrown by func and not a retryable BaseServiceException
256259
*/
257260
private <T> T executeWithRetry(Supplier<T> func, Duration timeout) throws InterruptedException {
258261
final long start = time.milliseconds(); // explicit clock to compute remaining
@@ -265,13 +268,15 @@ private <T> T executeWithRetry(Supplier<T> func, Duration timeout) throws Interr
265268
return func.get();
266269
} catch (BaseServiceException e) {
267270
if (!e.isRetryable()) {
268-
logger.error("Non-retryable exception on attempt {}", attempt + 1, e);
269-
throw e;
271+
String msg = String.format("Non-retryable exception on attempt %s.", attempt + 1);
272+
logger.error(msg);
273+
throw new BigQueryConnectException(msg, e);
270274
}
271275
if (attempt >= retries) {
272276
// Out of configured retries
273-
logger.error("Operation failed after {} attempts (no retries left).", attempt + 1);
274-
throw e;
277+
String msg = String.format("Operation failed after %s attempts (no retries left).", attempt + 1);
278+
logger.error(msg);
279+
throw new BigQueryConnectException(msg, e);
275280
}
276281

277282
// Compute next backoff = min(MAX_BACKOFF_MS, retryWaitMs * 2^attempt)
@@ -283,8 +288,9 @@ private <T> T executeWithRetry(Supplier<T> func, Duration timeout) throws Interr
283288
long elapsed = time.milliseconds() - start;
284289
long remaining = budget - elapsed;
285290
if (remaining <= 0) {
286-
logger.error("Timeout expired after {} attempts within {} ms budget.", attempt + 1, budget);
287-
return null;
291+
String msg = String.format("Timeout expired after %s attempts within %s ms budget.", attempt + 1, budget);
292+
logger.error(msg);
293+
throw new BigQueryConnectException(msg, e);
288294
}
289295
delay = Math.min(delay, Math.max(0L, remaining));
290296
}
@@ -303,6 +309,10 @@ private <T> T executeWithRetry(Supplier<T> func, Duration timeout) throws Interr
303309
time.sleep(delay);
304310
}
305311
attempt++;
312+
} catch (RuntimeException e) {
313+
String msg = "Operation failed during executeWithRetry: " + e.getMessage();
314+
logger.error(msg);
315+
throw new BigQueryConnectException(msg, e);
306316
}
307317
}
308318
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriterTest.java

Lines changed: 193 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323

2424
package com.wepay.kafka.connect.bigquery.write.row;
2525

26+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2627
import static org.junit.jupiter.api.Assertions.assertThrows;
2728
import static org.junit.jupiter.api.Assertions.assertEquals;
2829
import static org.junit.jupiter.api.Assertions.assertTrue;
2930
import static org.junit.jupiter.api.Assumptions.assumeTrue;
3031
import static org.mockito.ArgumentMatchers.any;
3132
import static org.mockito.ArgumentMatchers.anyList;
33+
import static org.mockito.ArgumentMatchers.eq;
34+
import static org.mockito.Mockito.doThrow;
3235
import static org.mockito.Mockito.mock;
3336
import static org.mockito.Mockito.times;
3437
import static org.mockito.Mockito.verify;
@@ -38,6 +41,7 @@
3841
import static org.mockito.Mockito.atMost;
3942
import static org.mockito.Mockito.verifyNoMoreInteractions;
4043

44+
import com.google.cloud.bigquery.TableInfo;
4145
import com.google.gson.Gson;
4246
import com.google.gson.JsonIOException;
4347
import com.google.cloud.bigquery.BigQuery;
@@ -61,6 +65,8 @@
6165
import com.wepay.kafka.connect.bigquery.write.storage.StorageApiBatchModeHandler;
6266
import com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiDefaultStream;
6367
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
68+
69+
6470
import java.nio.ByteBuffer;
6571
import java.util.LinkedHashMap;
6672
import java.util.Collections;
@@ -83,6 +89,7 @@
8389
import org.junit.jupiter.api.Test;
8490

8591
import io.debezium.data.VariableScaleDecimal;
92+
import org.mockito.stubbing.OngoingStubbing;
8693

8794
public class GcsToBqWriterTest {
8895

@@ -91,11 +98,16 @@ public class GcsToBqWriterTest {
9198
private static StorageWriteApiDefaultStream mockedStorageWriteApiDefaultStream = mock(StorageWriteApiDefaultStream.class);
9299
private static StorageApiBatchModeHandler mockedBatchHandler = mock(StorageApiBatchModeHandler.class);
93100

101+
private static final TableId tableId = TableId.of("ds", "tbl");
102+
private static final Table table = mock(Table.class);
103+
94104
private final Time time = new MockTime();
95105

96106
@BeforeAll
97107
public static void initializePropertiesFactory() {
98108
propertiesFactory = new SinkPropertiesFactory();
109+
when(table.getTableId()).thenReturn(tableId);
110+
99111
}
100112

101113
@Test
@@ -230,7 +242,7 @@ public void happyPathNoRetry() throws Exception {
230242
storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, false, mockTime);
231243

232244
long t0 = mockTime.milliseconds();
233-
writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob");
245+
writer.writeRows(oneRow(), tableId, "bucket", "blob");
234246
long elapsed = mockTime.milliseconds() - t0;
235247

236248
// One lookup, one upload; no retries, no sleeps → elapsed should be 0
@@ -260,7 +272,7 @@ public void schemaUpdatedWhenEnabled() throws Exception {
260272
new GcsToBqWriter(
261273
storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, true, mockTime);
262274

263-
writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob");
275+
writer.writeRows(oneRow(), tableId, "bucket", "blob");
264276

265277
verify(schemaManager, times(1)).updateSchema(any(TableId.class), anyList());
266278
verify(storage, times(1)).create(any(BlobInfo.class), any(byte[].class));
@@ -291,7 +303,7 @@ public void backoffIsCapped() throws Exception {
291303
storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, true, mockTime);
292304

293305
long t0 = mockTime.milliseconds();
294-
writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob");
306+
writer.writeRows(oneRow(), tableId, "bucket", "blob");
295307
long elapsed = mockTime.milliseconds() - t0;
296308

297309
long minExpected = 20_000; // Budget = retries(4) * retryWaitMs(5000) = 20s
@@ -338,7 +350,7 @@ public void budgetCutsBeforeAllRetries() throws Exception {
338350
// with a BigQueryConnectException (null table interpreted as lookup failure).
339351
assertThrows(
340352
BigQueryConnectException.class,
341-
() -> writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob"));
353+
() -> writer.writeRows(oneRow(), tableId, "bucket", "blob"));
342354

343355
// We expect multiple getTable() attempts until budget expires.
344356
// Because jitter can vary up to 1s per sleep and sleeps are clamped by remaining budget,
@@ -352,6 +364,183 @@ public void budgetCutsBeforeAllRetries() throws Exception {
352364
verify(storage, never()).create(any(BlobInfo.class), any(byte[].class));
353365
}
354366

367+
368+
/**
369+
* A mocked SchemaManager.
370+
* @param createTable value for {@code createTable()} call. null == exception, else value.
371+
* @param schemaUpdate value for {@code updateSchema()} call. null == exception, false = BQException, true = success
372+
* @return A mock schema manager
373+
*/
374+
private SchemaManager mockSchemaManager(Boolean createTable, Boolean schemaUpdate) {
375+
SchemaManager schemaManager = mock(SchemaManager.class);
376+
377+
if (createTable == null) {
378+
doThrow(new IllegalArgumentException("SchemaManager create table failed")).when(schemaManager).createTable(eq(tableId), anyList());
379+
} else {
380+
when(schemaManager.createTable(eq(tableId), anyList())).thenReturn(createTable);
381+
}
382+
383+
if (schemaUpdate == null) {
384+
doThrow(new UnsupportedOperationException("SchemaManager threw exception")).when(schemaManager).updateSchema(any(), anyList());
385+
} else if (!schemaUpdate) {
386+
doThrow(new BigQueryConnectException("SchemaManager schema update failed")).when(schemaManager).updateSchema(any(), anyList());
387+
}
388+
return schemaManager;
389+
}
390+
391+
/**
392+
* Create a mocked big query.
393+
* @param falseCount the number of times to report the files is not found.
394+
* @param hasTable if true a table is returned on the falseCount + 1 request.
395+
* @return a mocked BigQuery.
396+
*/
397+
private BigQuery mockBigQuery(int falseCount, boolean hasTable) {
398+
BigQuery bigQuery = mock(BigQuery.class);
399+
OngoingStubbing<Table> stub = when(bigQuery.getTable(eq(tableId)));
400+
for (int i = 0; i < falseCount; i++) {
401+
stub = stub.thenReturn(null);
402+
}
403+
if (hasTable) {
404+
stub.thenReturn(table);
405+
}
406+
407+
return bigQuery;
408+
}
409+
410+
411+
/**
412+
* A mocked Storage.
413+
* @param retryError null = no error, true = retryable error, false = non-retryable error.
414+
* @return a mocked storage that succeeds or fails beased on retryError flag.
415+
*/
416+
private Storage mockStorage(Boolean retryError) {
417+
Storage storage = mock(Storage.class);
418+
if (retryError != null) {
419+
StorageException storageException = retryError ? new StorageException(500, "it failed") : new StorageException(400, "it failed");
420+
when(storage.create(any(BlobInfo.class), any(byte[].class))).thenThrow(storageException);
421+
}
422+
return storage;
423+
}
424+
425+
426+
@Test
427+
void writeRowsCreateTableTest() {
428+
final int retries = 4;
429+
final long retryWaitMs = 100L;
430+
final boolean attemptSchemaUpdate = false;
431+
432+
Time mockTime = new MockTime(); // virtual clock; sleep() advances time but doesn’t block
433+
434+
// BigQuery does not have the table, schema manager should not be called, any call the schema manager will result in an exception.
435+
String msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, false), mockSchemaManager(null, null),
436+
retries, retryWaitMs, false, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
437+
"no table, schema manager exception"
438+
).getMessage();
439+
assertEquals("Failed to lookup table " + tableId, msg);
440+
441+
// BigQuery does not have the table. Schema manager will return true
442+
msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, false), mockSchemaManager(true, null),
443+
retries, retryWaitMs, true, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
444+
"no table, schema manager reports success").getMessage();
445+
assertEquals("Failed to lookup table " + tableId, msg);
446+
447+
// BigQuery does not have the table. Schema manager will return false
448+
msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, false), mockSchemaManager(false, null),
449+
retries, retryWaitMs, true, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
450+
"no table, schema manager reports failure").getMessage();
451+
assertEquals("Failed to lookup table " + tableId, msg);
452+
453+
// BigQuery does not have the table, schema manager will throw an exception.
454+
msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, false), mockSchemaManager(null, null),
455+
retries, retryWaitMs, true, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
456+
"no table, schema manager exception"
457+
).getMessage();
458+
assertEquals("Operation failed during executeWithRetry: SchemaManager create table failed", msg);
459+
460+
// BigQuery does not have the table on the first call, but will on second call. Schema manager will return false
461+
assertDoesNotThrow(() ->
462+
new GcsToBqWriter(mockStorage(null), mockBigQuery(1, true), mockSchemaManager(false, null),
463+
retries, retryWaitMs, true, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
464+
"not table then table, schema manager did not create table");
465+
466+
assertDoesNotThrow(() -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, true), mockSchemaManager(true, null),
467+
retries, retryWaitMs, true, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
468+
"not table then table, schema manager did create table");
469+
470+
// BigQuery does not have the table on the first call, but will on second call. Schema manager should not be called, any call the schema manager will result in an exception.
471+
assertDoesNotThrow(() -> new GcsToBqWriter(mockStorage(null), mockBigQuery(1, true), mockSchemaManager(null, null),
472+
retries, retryWaitMs, false, attemptSchemaUpdate, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
473+
"not table then table, schema manager exception");
474+
475+
}
476+
477+
@Test
478+
void writeRowsUpdateSchemaTest() {
479+
final int retries = 4;
480+
final long retryWaitMs = 100L;
481+
482+
Time mockTime = new MockTime(); // virtual clock; sleep() advances time but doesn’t block
483+
484+
// schema update throws exception.
485+
String msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(0, true),
486+
mockSchemaManager(null, null),
487+
retries, retryWaitMs, false, true, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
488+
"schema manager update failed"
489+
).getMessage();
490+
assertEquals("Operation failed during executeWithRetry: SchemaManager threw exception", msg);
491+
492+
// schema update returns false
493+
msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(null), mockBigQuery(0, true),
494+
mockSchemaManager(null, false),
495+
retries, retryWaitMs, false, true, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
496+
"schema manager update failed"
497+
).getMessage();
498+
assertEquals("Operation failed during executeWithRetry: SchemaManager schema update failed", msg);
499+
500+
final SchemaManager schemaManager = mock(SchemaManager.class);
501+
doThrow(new BigQueryException(500, "it failed")).when(schemaManager).updateSchema(any(), anyList());
502+
msg = assertThrows(BigQueryConnectException.class,() -> new GcsToBqWriter(mockStorage(null), mockBigQuery(0, true), schemaManager,
503+
retries, retryWaitMs, false, true, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
504+
"schema manager update faild with timeout").getMessage();
505+
assertTrue(msg.startsWith("Timeout expired after "));
506+
507+
// schema update returns true
508+
assertDoesNotThrow(() -> new GcsToBqWriter(mockStorage(null), mockBigQuery(0, true), mockSchemaManager(null, true),
509+
retries, retryWaitMs, false, true, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
510+
"schema manager update succeeded"
511+
);
512+
}
513+
514+
@Test
515+
void writeRowsUploadData() {
516+
final int retries = 4;
517+
final long retryWaitMs = 100L;
518+
519+
Time mockTime = new MockTime(); // virtual clock; sleep() advances time but doesn’t block
520+
521+
BigQuery bq = mockBigQuery(0, true);
522+
523+
// storage succeeds.
524+
assertDoesNotThrow(() -> new GcsToBqWriter(mockStorage(null), bq, null,
525+
retries, retryWaitMs, false, false, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
526+
"upload succeeded"
527+
);
528+
529+
// storage throws retryable error
530+
String msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(true), mockBigQuery(0, true), mockSchemaManager(null, true),
531+
retries, retryWaitMs, false, false, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
532+
"upload failed -- retry"
533+
).getMessage();
534+
assertTrue(msg.startsWith("Timeout expired after"));
535+
536+
// storage throws non-retryable error.
537+
msg = assertThrows(BigQueryConnectException.class, () -> new GcsToBqWriter(mockStorage(false), mockBigQuery(0, true), mockSchemaManager(null, true),
538+
retries, retryWaitMs, false, false, mockTime).writeRows(oneRow(), tableId, "bucket", "blob"),
539+
"upload failed -- no retry"
540+
).getMessage();
541+
assertEquals("Non-retryable exception on attempt 1.", msg);
542+
}
543+
355544
@Nested
356545
@DisplayName("JSON serialization (Gson / ByteBuffer)")
357546
class JsonSerializationTests {

0 commit comments

Comments
 (0)