diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..8c66a15d7 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -173,10 +173,14 @@ private RowToInsert getRecordRow(SinkRecord record) { if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) { convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); } - return RowToInsert.of(getRowId(record), convertedRecord); + if (config.getBoolean(config.BEST_EFFORT_DEDUPLICATION_CONFIG)) { + return RowToInsert.of(getRowId(record), convertedRecord); + } else { + return RowToInsert.of(convertedRecord); + } } - private String getRowId(SinkRecord record) { + String getRowId(SinkRecord record) { return String.format("%s-%d-%d", record.topic(), record.kafkaPartition(), diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index d4362c381..06ecce9b4 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -233,6 +233,15 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final String TABLE_CREATE_DOC = "Automatically create BigQuery tables if they don't already exist"; + public static final String BEST_EFFORT_DEDUPLICATION_CONFIG = "bestEffortDeduplication"; + private static final ConfigDef.Type BEST_EFFORT_DEDUPLICATION_TYPE = ConfigDef.Type.BOOLEAN; + public static final Boolean BEST_EFFORT_DEDUPLICATION_DEFAULT = true; + private static final ConfigDef.Importance BEST_EFFORT_DEDUPLICATION_IMPORTANCE = + ConfigDef.Importance.MEDIUM; + private static final String BEST_EFFORT_DEDUPLICATION_DOC = + "If false, Big Query best effort de-duplication will be disabled, which increases " + + "the streaming ingest quota, at the expense of not checking for duplicates"; + static { config = new ConfigDef() .define( @@ -365,7 +374,13 @@ public class BigQuerySinkConfig extends AbstractConfig { TABLE_CREATE_DEFAULT, TABLE_CREATE_IMPORTANCE, TABLE_CREATE_DOC - ); + ).define( + BEST_EFFORT_DEDUPLICATION_CONFIG, + BEST_EFFORT_DEDUPLICATION_TYPE, + BEST_EFFORT_DEDUPLICATION_DEFAULT, + BEST_EFFORT_DEDUPLICATION_IMPORTANCE, + BEST_EFFORT_DEDUPLICATION_DOC + ); } /** * Throw an exception if the passed-in properties do not constitute a valid sink. diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index af6547568..df9b3d8d7 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -98,6 +99,37 @@ public void testSimplePut() { verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class)); } + @Test + public void testPutWhenBestEffortDeduplicationIsSetToFalse() { + final String topic = "test-topic"; + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch"); + properties.put(BigQuerySinkTaskConfig.BEST_EFFORT_DEDUPLICATION_CONFIG, "false"); + + BigQuery bigQuery = mock(BigQuery.class); + Storage storage = mock(Storage.class); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + + when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); + when(insertAllResponse.hasErrors()).thenReturn(false); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = spy(new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager)); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.put(Collections.singletonList(spoofSinkRecord(topic))); + testTask.flush(Collections.emptyMap()); + verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class)); + verify(testTask, times(0)).getRowId(any(SinkRecord.class)); + } + @Test public void testSimplePutWhenSchemaRetrieverIsNotNull() { final String topic = "test-topic";