Skip to content

Commit cf01bf6

Browse files
author
Bingqin Zhou
committed
Fix getting some configs.
1 parent 2c165e1 commit cf01bf6

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,10 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
154154
private RowToInsert getRecordRow(SinkRecord record) {
155155
Map<String, Object> convertedRecord = recordConverter.convertRecord(record.valueSchema(), record.value());
156156
if (config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG)) {
157-
convertedRecord.put(config.KAFKA_KEY_FIELD_NAME_CONFIG, recordConverter.convertRecord(record.keySchema(), record.key()));
157+
convertedRecord.put(config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG), recordConverter.convertRecord(record.keySchema(), record.key()));
158158
}
159159
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
160-
convertedRecord.put(config.KAFKA_DATA_FIELD_NAME_CONFIG, KafkaDataBuilder.getKafkaDataRecord(record));
160+
convertedRecord.put(config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG), KafkaDataBuilder.getKafkaDataRecord(record));
161161
}
162162
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
163163
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);

0 commit comments

Comments
 (0)