Skip to content

Commit 6b1d4d4

Browse files
author
Alejandro del Castillo
committed
add bestEffortDeduplication config option
The connector is currently adding an InsertId per row, which is used by BigQuery to dedupe rows that have the same insertId (in a 1 minute window). Using insertIds throttles the ingestion rate to a maximum of 100k rows per second & 100 MB/s. Insertions without a insertId disable best effort de-duplication [1], which increases the ingestion quota to a maximum of 1 GB/s. For high throughput applications, its desirable to disable dedupe, handling duplication on the query side. [1] https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication Signed-off-by: Alejandro del Castillo <[email protected]>
1 parent 0483de1 commit 6b1d4d4

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,14 @@ private RowToInsert getRecordRow(SinkRecord record) {
173173
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
174174
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
175175
}
176-
return RowToInsert.of(getRowId(record), convertedRecord);
176+
if (config.getBoolean(config.BEST_EFFORT_DEDUPLICATION_CONFIG)) {
177+
return RowToInsert.of(getRowId(record), convertedRecord);
178+
} else {
179+
return RowToInsert.of(convertedRecord);
180+
}
177181
}
178182

179-
private String getRowId(SinkRecord record) {
183+
String getRowId(SinkRecord record) {
180184
return String.format("%s-%d-%d",
181185
record.topic(),
182186
record.kafkaPartition(),

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,15 @@ public class BigQuerySinkConfig extends AbstractConfig {
233233
private static final String TABLE_CREATE_DOC =
234234
"Automatically create BigQuery tables if they don't already exist";
235235

236+
public static final String BEST_EFFORT_DEDUPLICATION_CONFIG = "bestEffortDeduplication";
237+
private static final ConfigDef.Type BEST_EFFORT_DEDUPLICATION_TYPE = ConfigDef.Type.BOOLEAN;
238+
public static final Boolean BEST_EFFORT_DEDUPLICATION_DEFAULT = true;
239+
private static final ConfigDef.Importance BEST_EFFORT_DEDUPLICATION_IMPORTANCE =
240+
ConfigDef.Importance.MEDIUM;
241+
private static final String BEST_EFFORT_DEDUPLICATION_DOC =
242+
"If false, Big Query best effort de-duplication will be disabled, which increases "
243+
+ "the streaming ingest quota, at the expense of not checking for duplicates";
244+
236245
static {
237246
config = new ConfigDef()
238247
.define(
@@ -365,7 +374,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
365374
TABLE_CREATE_DEFAULT,
366375
TABLE_CREATE_IMPORTANCE,
367376
TABLE_CREATE_DOC
368-
);
377+
).define(
378+
BEST_EFFORT_DEDUPLICATION_CONFIG,
379+
BEST_EFFORT_DEDUPLICATION_TYPE,
380+
BEST_EFFORT_DEDUPLICATION_DEFAULT,
381+
BEST_EFFORT_DEDUPLICATION_IMPORTANCE,
382+
BEST_EFFORT_DEDUPLICATION_DOC
383+
);
369384
}
370385
/**
371386
* Throw an exception if the passed-in properties do not constitute a valid sink.

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Matchers.any;
2525
import static org.mockito.Matchers.anyObject;
2626
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.spy;
2728
import static org.mockito.Mockito.times;
2829
import static org.mockito.Mockito.verify;
2930
import static org.mockito.Mockito.when;
@@ -98,6 +99,37 @@ public void testSimplePut() {
9899
verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class));
99100
}
100101

102+
@Test
103+
public void testPutWhenBestEffortDeduplicationIsSetToFalse() {
104+
final String topic = "test-topic";
105+
106+
Map<String, String> properties = propertiesFactory.getProperties();
107+
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
108+
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch");
109+
properties.put(BigQuerySinkTaskConfig.BEST_EFFORT_DEDUPLICATION_CONFIG, "false");
110+
111+
BigQuery bigQuery = mock(BigQuery.class);
112+
Storage storage = mock(Storage.class);
113+
114+
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
115+
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
116+
117+
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
118+
when(insertAllResponse.hasErrors()).thenReturn(false);
119+
120+
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
121+
SchemaManager schemaManager = mock(SchemaManager.class);
122+
123+
BigQuerySinkTask testTask = spy(new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager));
124+
testTask.initialize(sinkTaskContext);
125+
testTask.start(properties);
126+
127+
testTask.put(Collections.singletonList(spoofSinkRecord(topic)));
128+
testTask.flush(Collections.emptyMap());
129+
verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class));
130+
verify(testTask, times(0)).getRowId(any(SinkRecord.class));
131+
}
132+
101133
@Test
102134
public void testSimplePutWhenSchemaRetrieverIsNotNull() {
103135
final String topic = "test-topic";

0 commit comments

Comments
 (0)