Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ public void put(Collection<SinkRecord> records) {
}
tableWriterBuilders.put(table, tableWriterBuilder);
}
tableWriterBuilders.get(table).addRow(record);

if (config.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_IGNORE_INSERT_ID_CONFIG))
tableWriterBuilders.get(table).addRowWithoutId(record);
else
tableWriterBuilders.get(table).addRow(record);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC =
"List of fields on which data should be clustered by in BigQuery, separated by commas";

public static final String BIGQUERY_IGNORE_INSERT_ID_CONFIG = "bigQueryIgnoreInsertId";
private static final ConfigDef.Type BIGQUERY_IGNORE_INSERT_ID_TYPE = ConfigDef.Type.BOOLEAN;
private static final Boolean BIGQUERY_IGNORE_INSERT_ID_DEFAULT = false;
private static final ConfigDef.Importance BIGQUERY_IGNORE_INSERT_ID_IMPORTANCE = ConfigDef.Importance.LOW;
private static final String BIGQUERY_IGNORE_INSERT_ID_DOC =
"If true, rows are not deduplicated by BigQuery. This is the recommended way to " +
" get higher streaming ingest quotas in certain regions.";

static {
config = BigQuerySinkConfig.getConfig()
.define(
Expand Down Expand Up @@ -174,6 +182,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
BIGQUERY_CLUSTERING_FIELD_NAMES_DEFAULT,
BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE,
BIGQUERY_CLUSTERING_FIELD_NAMES_DOC
).define(
BIGQUERY_IGNORE_INSERT_ID_CONFIG,
BIGQUERY_IGNORE_INSERT_ID_TYPE,
BIGQUERY_IGNORE_INSERT_ID_DEFAULT,
BIGQUERY_IGNORE_INSERT_ID_IMPORTANCE,
BIGQUERY_IGNORE_INSERT_ID_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public SinkRecordConverter(RecordConverter<Map<String, Object>> recordConverter,
this.kafkaDataFieldName = kafkaDataFieldName;
}

public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record) {
public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record, boolean ignoreRecordId) {
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
if (kafkaKeyFieldName.isPresent()) {
convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
Expand All @@ -53,7 +53,12 @@ public InsertAllRequest.RowToInsert getRecordRow(SinkRecord record) {
if (sanitizeFieldName) {
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
}
return InsertAllRequest.RowToInsert.of(getRowId(record), convertedRecord);

String rowId = getRowId(record);
if(ignoreRecordId)
rowId = null;

return InsertAllRequest.RowToInsert.of(rowId, convertedRecord);
}

private String getRowId(SinkRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,16 @@ public Builder setBlobName(String blobName) {
* @param record the row to add
*/
public void addRow(SinkRecord record) {
rows.put(record, recordConverter.getRecordRow(record));
rows.put(record, recordConverter.getRecordRow(record, false));
}

/**
* Add a record to the builder ignoring the id of the row.
*
* @param record the row to add
*/
public void addRowWithoutId(SinkRecord record) {
rows.put(record, recordConverter.getRecordRow(record, true));
}

public GCSBatchTableWriter build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,16 @@ public Builder(BigQueryWriter writer, PartitionedTableId table, SinkRecordConver
* @param record the row to add
*/
public void addRow(SinkRecord record) {
rows.put(record, recordConverter.getRecordRow(record));
rows.put(record, recordConverter.getRecordRow(record, false));
}

/**
* Add a record to the builder ignoring the id of the row.
*
* @param record the row to add
*/
public void addRowWithoutId(SinkRecord record) {
rows.put(record, recordConverter.getRecordRow(record, true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public interface TableWriterBuilder {
*/
void addRow(SinkRecord sinkRecord);

/**
* Add a record to the builder ignoring the id of the row.
*
* @param record the row to add
*/
void addRowWithoutId(SinkRecord record);

/**
* Create a {@link TableWriter} from this builder.
* @return a TableWriter containing the given writer, table, topic, and all added rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,12 @@ public void testSchemaUpdatesWithoutRetriever() {

new BigQuerySinkTaskConfig(badConfigProperties);
}

/** Test that the default setting for ignore row id is false */
@Test
public void testDefaultIgnoreIdSetting() {
Map<String, String> properties = propertiesFactory.getProperties();
BigQuerySinkTaskConfig config = new BigQuerySinkTaskConfig(properties);
assertFalse(config.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_IGNORE_INSERT_ID_CONFIG));
}
}