Skip to content

Commit 37bcaa2

Browse files
author
Alejandro del Castillo
committed
add bestEffortDeduplication config option
The connector is currently adding an InsertId per row, which is used by BigQuery to dedupe rows that have the same insertId (in a 1 minute window). Using insertIds throttles the ingestion rate to a maximum of 100k rows per second & 100 MB/s. Insertions without a insertId disable best effort de-duplication [1], which increases the ingestion quota to a maximum of 1 GB/s. For high throughput applications, its desirable to disable dedupe, handling duplication on the query side. [1] https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication Signed-off-by: Alejandro del Castillo <[email protected]>
1 parent 2797a48 commit 37bcaa2

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,11 @@ private RowToInsert getRecordRow(SinkRecord record) {
173173
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
174174
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
175175
}
176-
return RowToInsert.of(getRowId(record), convertedRecord);
176+
if (config.getBoolean(config.BEST_EFFORT_DEDUPLICATION_CONFIG)) {
177+
return RowToInsert.of(getRowId(record), convertedRecord);
178+
} else {
179+
return RowToInsert.of(convertedRecord);
180+
}
177181
}
178182

179183
private String getRowId(SinkRecord record) {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,15 @@ public class BigQuerySinkConfig extends AbstractConfig {
233233
private static final String TABLE_CREATE_DOC =
234234
"Automatically create BigQuery tables if they don't already exist";
235235

236+
public static final String BEST_EFFORT_DEDUPLICATION_CONFIG = "bestEffortDeduplication";
237+
private static final ConfigDef.Type BEST_EFFORT_DEDUPLICATION_TYPE = ConfigDef.Type.BOOLEAN;
238+
public static final Boolean BEST_EFFORT_DEDUPLICATION_DEFAULT = true;
239+
private static final ConfigDef.Importance BEST_EFFORT_DEDUPLICATION_IMPORTANCE =
240+
ConfigDef.Importance.MEDIUM;
241+
private static final String BEST_EFFORT_DEDUPLICATION_DOC =
242+
"If false, Big Query best effort de-duplication will be disabled, which increases "
243+
+ "the streaming ingest quota, at the expense of not checking for duplicates";
244+
236245
static {
237246
config = new ConfigDef()
238247
.define(
@@ -365,7 +374,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
365374
TABLE_CREATE_DEFAULT,
366375
TABLE_CREATE_IMPORTANCE,
367376
TABLE_CREATE_DOC
368-
);
377+
).define(
378+
BEST_EFFORT_DEDUPLICATION_CONFIG,
379+
BEST_EFFORT_DEDUPLICATION_TYPE,
380+
BEST_EFFORT_DEDUPLICATION_DEFAULT,
381+
BEST_EFFORT_DEDUPLICATION_IMPORTANCE,
382+
BEST_EFFORT_DEDUPLICATION_DOC
383+
);
369384
}
370385
/**
371386
* Throw an exception if the passed-in properties do not constitute a valid sink.

0 commit comments

Comments
 (0)