Skip to content

Commit fb7263e

Browse files
avocadermtagle
authored andcommitted
CC-7399: Handle null value if schema-less record has a map with a null value. (#220)
1 parent f7c1518 commit fb7263e

File tree

2 files changed

+46
-14
lines changed

2 files changed

+46
-14
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,22 +110,22 @@ private Object convertSchemalessRecord(Object value) {
110110
}
111111
if (value instanceof Map) {
112112
return
113-
((Map<Object, Object>) value).entrySet().stream().collect(
114-
Collectors.toMap(
115-
entry -> {
116-
if (!(entry.getKey() instanceof String)) {
117-
throw new ConversionConnectException(
118-
"Failed to convert record to bigQuery format: " +
119-
"Map objects in absence of schema needs to have string value keys. ");
120-
}
121-
return entry.getKey();
122-
},
123-
entry -> convertSchemalessRecord(entry.getValue())
124-
)
125-
);
113+
((Map<Object, Object>) value)
114+
.entrySet()
115+
.stream()
116+
.collect(HashMap::new,
117+
(m, e) -> {
118+
if (!(e.getKey() instanceof String)) {
119+
throw new ConversionConnectException(
120+
"Failed to convert record to bigQuery format: " +
121+
"Map objects in absence of schema needs to have string value keys. ");
122+
}
123+
m.put(e.getKey(), convertSchemalessRecord(e.getValue()));
124+
},
125+
HashMap::putAll);
126126
}
127127
throw new ConversionConnectException("Unsupported class " + value.getClass() +
128-
" found in schemaless record data. Can't convert record to bigQuery format");
128+
" found in schemaless record data. Can't convert record to bigQuery format");
129129
}
130130

131131
@SuppressWarnings("unchecked")

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.apache.kafka.connect.sink.SinkRecord;
3131

32+
import org.junit.Assert;
3233
import org.junit.Test;
3334

3435
import java.nio.ByteBuffer;
@@ -604,6 +605,37 @@ public void testInvalidMapSchemaless() {
604605
new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE);
605606
}
606607

608+
@Test
609+
public void testInvalidMapSchemalessNullValue() {
610+
Map kafkaConnectMap = new HashMap<Object, Object>(){{
611+
put("f1", "abc");
612+
put("f2", "abc");
613+
put("f3", null);
614+
}};
615+
616+
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap);
617+
Map<String, Object> stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord);
618+
Assert.assertEquals(kafkaConnectMap, stringObjectMap
619+
);
620+
}
621+
622+
@Test
623+
public void testInvalidMapSchemalessNestedMapNullValue() {
624+
Map kafkaConnectMap = new HashMap<Object, Object>(){{
625+
put("f1", "abc");
626+
put("f2", "abc");
627+
put("f3", new HashMap<Object, Object>() {{
628+
put("f31", "xyz");
629+
put("f32", null);
630+
}});
631+
}};
632+
633+
SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap);
634+
Map<String, Object> stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE)
635+
.convertRecord(kafkaConnectRecord);
636+
Assert.assertEquals(kafkaConnectMap, stringObjectMap);
637+
}
638+
607639
@Test
608640
public void testMapSchemalessConvertDouble() {
609641
Map kafkaConnectMap = new HashMap<Object, Object>(){{

0 commit comments

Comments
 (0)