|
18 | 18 |
|
19 | 19 | package org.apache.flink.connector.kafka.source.reader.deserializer;
|
20 | 20 |
|
| 21 | +import org.apache.flink.api.common.typeinfo.TypeHint; |
| 22 | +import org.apache.flink.api.common.typeinfo.TypeInformation; |
21 | 23 | import org.apache.flink.connector.kafka.testutils.SimpleCollector;
|
22 | 24 | import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
|
23 | 25 | import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
|
@@ -79,25 +81,17 @@ public void testKafkaDeserializationSchemaWrapper() throws Exception {
|
79 | 81 | @Test
|
80 | 82 | public void testKafkaValueDeserializationSchemaWrapper() throws Exception {
|
81 | 83 | final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord();
|
82 |
| - KafkaRecordDeserializationSchema< |
83 |
| - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node |
84 |
| - .ObjectNode> |
85 |
| - schema = |
86 |
| - KafkaRecordDeserializationSchema.valueOnly( |
87 |
| - new JsonDeserializationSchema<>( |
88 |
| - org.apache.flink.shaded.jackson2.com.fasterxml.jackson |
89 |
| - .databind.node.ObjectNode.class)); |
| 84 | + KafkaRecordDeserializationSchema<Map<String, Object>> schema = |
| 85 | + KafkaRecordDeserializationSchema.valueOnly( |
| 86 | + new JsonDeserializationSchema<>( |
| 87 | + TypeInformation.of(new TypeHint<Map<String, Object>>() {}))); |
90 | 88 | schema.open(new DummyInitializationContext());
|
91 |
| - SimpleCollector< |
92 |
| - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node |
93 |
| - .ObjectNode> |
94 |
| - collector = new SimpleCollector<>(); |
| 89 | + SimpleCollector<Map<String, Object>> collector = new SimpleCollector<>(); |
95 | 90 | schema.deserialize(consumerRecord, collector);
|
96 | 91 |
|
97 | 92 | assertThat(collector.getList()).hasSize(1);
|
98 |
| - org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode |
99 |
| - deserializedValue = collector.getList().get(0); |
100 |
| - assertThat(deserializedValue.get("word").asText()).isEqualTo("world"); |
| 93 | + Map<String, Object> deserializedValue = collector.getList().get(0); |
| 94 | + assertThat(deserializedValue.get("word")).isEqualTo("world"); |
101 | 95 | assertThat(deserializedValue.get("key")).isNull();
|
102 | 96 | assertThat(deserializedValue.get("metadata")).isNull();
|
103 | 97 | }
|
|
0 commit comments