Skip to content

Commit 2c165e1

Browse files
author
Bingqin Zhou
committed
Modify convertRecord to avoid boolean to be passed in.
1 parent d5b2389 commit 2c165e1

File tree

4 files changed

+40
-38
lines changed

4 files changed

+40
-38
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.common.TopicPartition;
4949
import org.apache.kafka.common.config.ConfigException;
5050
import org.apache.kafka.common.record.TimestampType;
51+
import org.apache.kafka.connect.data.Schema;
5152
import org.apache.kafka.connect.errors.ConnectException;
5253
import org.apache.kafka.connect.sink.SinkRecord;
5354
import org.apache.kafka.connect.sink.SinkTask;
@@ -151,9 +152,9 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
151152
}
152153

153154
private RowToInsert getRecordRow(SinkRecord record) {
154-
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, false);
155+
Map<String, Object> convertedRecord = recordConverter.convertRecord(record.valueSchema(), record.value());
155156
if (config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG)) {
156-
convertedRecord.put(config.KAFKA_KEY_FIELD_NAME_CONFIG, recordConverter.convertRecord(record, true));
157+
convertedRecord.put(config.KAFKA_KEY_FIELD_NAME_CONFIG, recordConverter.convertRecord(record.keySchema(), record.key()));
157158
}
158159
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
159160
convertedRecord.put(config.KAFKA_DATA_FIELD_NAME_CONFIG, KafkaDataBuilder.getKafkaDataRecord(record));

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial) {
6565

6666
/**
6767
* Convert a {@link SinkRecord} into the contents of a BigQuery {@link RowToInsert}.
68-
*
69-
* @param kafkaConnectRecord The Kafka Connect record to convert. Must be of type {@link Struct},
68+
* @param kafkaConnectSchema The schema of the kafka connect record to convert. Must be of type {@link Struct},
69+
* in order to translate into a row format that requires each field to
70+
* consist of both a name and a value.
71+
* @param kafkaConnectStruct The struct of the kafka connect record to convert. Must be of type {@link Struct},
7072
* in order to translate into a row format that requires each field to
7173
* consist of both a name and a value.
7274
* @return The result BigQuery row content.
7375
*/
74-
public Map<String, Object> convertRecord(SinkRecord kafkaConnectRecord, boolean convertKey) {
75-
Schema kafkaConnectSchema = convertKey ? kafkaConnectRecord.keySchema() : kafkaConnectRecord.valueSchema();
76-
Object kafkaConnectStruct = convertKey ? kafkaConnectRecord.key() : kafkaConnectRecord.value();
76+
public Map<String, Object> convertRecord(Schema kafkaConnectSchema, Object kafkaConnectStruct) {
7777
if (kafkaConnectSchema == null) {
7878
if (kafkaConnectStruct instanceof Map) {
7979
return (Map<String, Object>) convertSchemalessRecord(kafkaConnectStruct);
@@ -172,8 +172,7 @@ private Object convertObject(Object kafkaConnectObject, Schema kafkaConnectSchem
172172
}
173173
}
174174

175-
private Map<String, Object> convertStruct(Object kafkaConnectObject,
176-
Schema kafkaConnectSchema) {
175+
private Map<String, Object> convertStruct(Object kafkaConnectObject, Schema kafkaConnectSchema) {
177176
Map<String, Object> bigQueryRecord = new HashMap<>();
178177
List<Field> kafkaConnectSchemaFields = kafkaConnectSchema.fields();
179178
Struct kafkaConnectStruct = (Struct) kafkaConnectObject;

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/RecordConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020

21+
import org.apache.kafka.connect.data.Schema;
2122
import org.apache.kafka.connect.sink.SinkRecord;
2223

2324
/**
@@ -26,9 +27,10 @@
2627
*/
2728
public interface RecordConverter<R> {
2829
/**
29-
* @param record The record to convert.
30+
* @param schema The schema of the record to convert.
31+
* @param struct The struct of the record to convert.
3032
* @return The converted record.
3133
*/
32-
R convertRecord(SinkRecord record, boolean convertKey);
34+
R convertRecord(Schema schema, Object struct);
3335

3436
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class BigQueryRecordConverterTest {
4747
@Test(expected = ConversionConnectException.class)
4848
public void testTopLevelRecord() {
4949
SinkRecord kafkaConnectRecord = spoofSinkRecord(Schema.BOOLEAN_SCHEMA, false, false);
50-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
50+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
5151
}
5252

5353
@Test
@@ -68,7 +68,7 @@ public void testBoolean() {
6868
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
6969

7070
Map<String, Object> bigQueryTestRecord =
71-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
71+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
7272
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
7373
}
7474

@@ -90,7 +90,7 @@ public void testInteger() {
9090
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
9191

9292
Map<String, Object> bigQueryTestRecord =
93-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
93+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
9494
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
9595

9696
final Short fieldShortValue = (short) 4242;
@@ -107,7 +107,7 @@ public void testInteger() {
107107
kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
108108

109109
bigQueryTestRecord =
110-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
110+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
111111
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
112112

113113
final Integer fieldIntegerValue = 424242;
@@ -124,7 +124,7 @@ public void testInteger() {
124124
kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
125125

126126
bigQueryTestRecord =
127-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
127+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
128128
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
129129

130130
final Long fieldLongValue = 424242424242L;
@@ -141,7 +141,7 @@ public void testInteger() {
141141
kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
142142

143143
bigQueryTestRecord =
144-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
144+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
145145
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
146146
}
147147

@@ -162,7 +162,7 @@ public void testInteger() {
162162
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
163163

164164
Map<String, Object> bigQueryTestRecord =
165-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
165+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
166166
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
167167

168168
final Double fieldDoubleValue = 4242424242.4242;
@@ -180,7 +180,7 @@ public void testInteger() {
180180
kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
181181

182182
bigQueryTestRecord =
183-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
183+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
184184
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
185185
}
186186

@@ -207,7 +207,7 @@ public void testInteger() {
207207
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
208208

209209
Map<String, Object> bigQueryTestRecord =
210-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
210+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
211211
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
212212
}
213213
}
@@ -230,7 +230,7 @@ public void testString() {
230230
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
231231

232232
Map<String, Object> bigQueryTestRecord =
233-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
233+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
234234
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
235235
}
236236

@@ -263,7 +263,7 @@ public void testStruct() {
263263
spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, false);
264264
Map<String, Object> bigQueryTestInnerRecord =
265265
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
266-
.convertRecord(kafkaConnectInnerSinkRecord, false);
266+
.convertRecord(kafkaConnectInnerSinkRecord.valueSchema(), kafkaConnectInnerSinkRecord.value());
267267
assertEquals(bigQueryExpectedInnerRecord, bigQueryTestInnerRecord);
268268

269269

@@ -285,7 +285,7 @@ public void testStruct() {
285285
spoofSinkRecord(kafkaConnectMiddleSchema, kafkaConnectMiddleStruct, true);
286286
Map<String, Object> bigQueryTestMiddleRecord =
287287
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
288-
.convertRecord(kafkaConnectMiddleSinkRecord, true);
288+
.convertRecord(kafkaConnectMiddleSinkRecord.keySchema(), kafkaConnectMiddleSinkRecord.key());
289289
assertEquals(bigQueryExpectedMiddleRecord, bigQueryTestMiddleRecord);
290290

291291

@@ -307,7 +307,7 @@ public void testStruct() {
307307
spoofSinkRecord(kafkaConnectOuterSchema, kafkaConnectOuterStruct, false);
308308
Map<String, Object> bigQueryTestOuterRecord =
309309
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
310-
.convertRecord(kafkaConnectOuterSinkRecord, false);
310+
.convertRecord(kafkaConnectOuterSinkRecord.valueSchema(), kafkaConnectOuterSinkRecord.value());
311311
assertEquals(bigQueryExpectedOuterRecord, bigQueryTestOuterRecord);
312312
}
313313

@@ -345,7 +345,7 @@ public void testMap() {
345345
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
346346

347347
Map<String, Object> bigQueryTestRecord =
348-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
348+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
349349
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
350350
}
351351

@@ -367,7 +367,7 @@ public void testIntegerArray() {
367367
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
368368

369369
Map<String, Object> bigQueryTestRecord =
370-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
370+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
371371
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
372372
}
373373

@@ -395,7 +395,7 @@ public void testStructArray() {
395395
spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, true);
396396
Map<String, Object> bigQueryTestInnerRecord =
397397
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
398-
.convertRecord(kafkaConnectInnerSinkRecord, true);
398+
.convertRecord(kafkaConnectInnerSinkRecord.keySchema(), kafkaConnectInnerSinkRecord.key());
399399
assertEquals(bigQueryExpectedInnerRecord, bigQueryTestInnerRecord);
400400

401401
final String middleFieldArrayName = "MiddleArray";
@@ -415,7 +415,7 @@ public void testStructArray() {
415415
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
416416

417417
Map<String, Object> bigQueryTestRecord =
418-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
418+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
419419
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
420420
}
421421

@@ -438,7 +438,7 @@ public void testStringArray() {
438438
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
439439

440440
Map<String, Object> bigQueryTestRecord =
441-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
441+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
442442
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
443443
}
444444

@@ -462,7 +462,7 @@ public void testBytes() {
462462
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
463463

464464
Map<String, Object> bigQueryTestRecord =
465-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
465+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
466466
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
467467
}
468468

@@ -484,7 +484,7 @@ public void testDebeziumLogicalType() {
484484
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
485485

486486
Map<String, Object> bigQueryTestRecord =
487-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
487+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
488488
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
489489
}
490490

@@ -506,7 +506,7 @@ public void testKafkaLogicalType() {
506506
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
507507

508508
Map<String, Object> bigQueryTestRecord =
509-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
509+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
510510
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
511511

512512
}
@@ -533,7 +533,7 @@ public void testNullable() {
533533
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true);
534534

535535
Map<String, Object> bigQueryTestRecord =
536-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
536+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
537537
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
538538
}
539539

@@ -556,7 +556,7 @@ public void testNullableStruct() {
556556
SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false);
557557

558558
Map<String, Object> bigQueryTestRecord =
559-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
559+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
560560
assertEquals(bigQueryExpectedRecord, bigQueryTestRecord);
561561
}
562562

@@ -578,7 +578,7 @@ public void testValidMapSchemaless() {
578578

579579
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true);
580580
Map<String, Object> convertedMap =
581-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
581+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
582582
assertEquals(kafkaConnectMap, convertedMap);
583583
}
584584

@@ -600,7 +600,7 @@ public void testInvalidMapSchemaless() {
600600

601601
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, false);
602602
Map<String, Object> convertedMap =
603-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
603+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
604604
}
605605

606606
@Test
@@ -621,7 +621,7 @@ public void testMapSchemalessConvertDouble() {
621621

622622
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true);
623623
Map<String, Object> convertedMap =
624-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, true);
624+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.keySchema(), kafkaConnectRecord.key());
625625
assertEquals(convertedMap.get("f1"), Double.MAX_VALUE);
626626
assertEquals(((Map)(convertedMap.get("f3"))).get("f4"), Double.MAX_VALUE);
627627
assertEquals(((ArrayList)((Map)(convertedMap.get("f3"))).get("f6")).get(1), Double.MAX_VALUE);
@@ -647,7 +647,7 @@ public void testMapSchemalessConvertBytes() {
647647

648648
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, false);
649649
Map<String, Object> convertedMap =
650-
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, false);
650+
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord.valueSchema(), kafkaConnectRecord.value());
651651
assertEquals(convertedMap.get("f1"), Base64.getEncoder().encodeToString(helloWorld));
652652
assertEquals(((Map)(convertedMap.get("f3"))).get("f4"), Base64.getEncoder().encodeToString(helloWorld));
653653
}

0 commit comments

Comments
 (0)