diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index b338e27b0..11f05a0af 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -211,7 +211,11 @@ public void put(Collection records) { } tableWriterBuilders.put(table, tableWriterBuilder); } - tableWriterBuilders.get(table).addRow(record); + + if (config.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_IGNORE_INSERT_ID_CONFIG)) + tableWriterBuilders.get(table).addRowWithoutId(record); + else + tableWriterBuilders.get(table).addRow(record); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java index 9b395700b..84e074048 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java @@ -120,6 +120,14 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC = "List of fields on which data should be clustered by in BigQuery, separated by commas"; + public static final String BIGQUERY_IGNORE_INSERT_ID_CONFIG = "bigQueryIgnoreInsertId"; + private static final ConfigDef.Type BIGQUERY_IGNORE_INSERT_ID_TYPE = ConfigDef.Type.BOOLEAN; + private static final Boolean BIGQUERY_IGNORE_INSERT_ID_DEFAULT = false; + private static final ConfigDef.Importance BIGQUERY_IGNORE_INSERT_ID_IMPORTANCE = ConfigDef.Importance.LOW; + private static final String BIGQUERY_IGNORE_INSERT_ID_DOC = + "If true, rows are not deduplicated by BigQuery. This is the recommended way to " + + " get higher streaming ingest quotas in certain regions."; + static { config = BigQuerySinkConfig.getConfig() .define( @@ -174,6 +182,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig { BIGQUERY_CLUSTERING_FIELD_NAMES_DEFAULT, BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE, BIGQUERY_CLUSTERING_FIELD_NAMES_DOC + ).define( + BIGQUERY_IGNORE_INSERT_ID_CONFIG, + BIGQUERY_IGNORE_INSERT_ID_TYPE, + BIGQUERY_IGNORE_INSERT_ID_DEFAULT, + BIGQUERY_IGNORE_INSERT_ID_IMPORTANCE, + BIGQUERY_IGNORE_INSERT_ID_DOC ); } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java index 96cd2144b..a0ebacc30 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SinkRecordConverter.java @@ -42,7 +42,7 @@ public SinkRecordConverter(RecordConverter> recordConverter, this.kafkaDataFieldName = kafkaDataFieldName; } - public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record) { + public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record, boolean ignoreRecordId) { Map convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); if (kafkaKeyFieldName.isPresent()) { convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY)); @@ -53,7 +53,12 @@ public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record) { if (sanitizeFieldName) { convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); } - return InsertAllRequest.RowToInsert.of(getRowId(record), convertedRecord); + + String rowId = getRowId(record); + if(ignoreRecordId) + rowId = null; + + return InsertAllRequest.RowToInsert.of(rowId, convertedRecord); } private String getRowId(SinkRecord record) { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java index 45d12e1dd..4451d5b4d 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java @@ -129,7 +129,16 @@ public Builder setBlobName(String blobName) { * @param record the row to add */ public void addRow(SinkRecord record) { - rows.put(record, recordConverter.getRecordRow(record)); + rows.put(record, recordConverter.getRecordRow(record, false)); + } + + /** + * Add a record to the builder ignoring the id of the row. + * + * @param record the row to add + */ + public void addRowWithoutId(SinkRecord record) { + rows.put(record, recordConverter.getRecordRow(record, true)); } public GCSBatchTableWriter build() { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java index dee011142..5f8aa0d31 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java @@ -172,7 +172,16 @@ public Builder(BigQueryWriter writer, PartitionedTableId table, SinkRecordConver * @param record the row to add */ public void addRow(SinkRecord record) { - rows.put(record, recordConverter.getRecordRow(record)); + rows.put(record, recordConverter.getRecordRow(record, false)); + } + + /** + * Add a record to the builder ignoring the id of the row. + * + * @param record the row to add + */ + public void addRowWithoutId(SinkRecord record) { + rows.put(record, recordConverter.getRecordRow(record, true)); } /** diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java index 556ae98e3..258b8f3be 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java @@ -31,6 +31,13 @@ public interface TableWriterBuilder { */ void addRow(SinkRecord sinkRecord); + /** + * Add a record to the builder ignoring the id of the row. + * + * @param record the row to add + */ + void addRowWithoutId(SinkRecord record); + /** * Create a {@link TableWriter} from this builder. * @return a TableWriter containing the given writer, table, topic, and all added rows. diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 358a3c8ab..16d14d818 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -18,9 +18,7 @@ */ -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java index f751afec5..771e9a7ac 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java @@ -172,4 +172,12 @@ public void testSchemaUpdatesWithoutRetriever() { new BigQuerySinkTaskConfig(badConfigProperties); } + + /** Test that the default setting for ignore row id is false */ + @Test + public void testDefaultIgnoreIdSetting() { + Map properties = propertiesFactory.getProperties(); + BigQuerySinkTaskConfig config = new BigQuerySinkTaskConfig(properties); + assertFalse(config.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_IGNORE_INSERT_ID_CONFIG)); + } }