|
24 | 24 | import com.dtstack.flink.sql.source.AbsDeserialization; |
25 | 25 | import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric; |
26 | 26 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
| 27 | +import org.apache.flink.api.common.typeinfo.Types; |
27 | 28 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
28 | 29 | import org.apache.flink.calcite.shaded.com.google.common.base.Strings; |
29 | 30 | import org.apache.flink.metrics.MetricGroup; |
30 | 31 | import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; |
| 32 | +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; |
31 | 33 | import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; |
32 | 34 | import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
33 | 35 | import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType; |
|
42 | 44 | import org.slf4j.LoggerFactory; |
43 | 45 |
|
44 | 46 | import java.io.IOException; |
| 47 | +import java.lang.reflect.Array; |
45 | 48 | import java.lang.reflect.Field; |
| 49 | +import java.sql.Date; |
| 50 | +import java.sql.Time; |
| 51 | +import java.sql.Timestamp; |
46 | 52 | import java.util.Iterator; |
47 | 53 | import java.util.Map; |
48 | 54 | import java.util.Set; |
@@ -137,7 +143,8 @@ public Row deserialize(byte[] message) throws IOException { |
137 | 143 | } |
138 | 144 | } else { |
139 | 145 | // Read the value as specified type |
140 | | - Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); |
| 146 | + |
| 147 | + Object value = convert(node, fieldTypes[i]); |
141 | 148 | row.setField(i, value); |
142 | 149 | } |
143 | 150 | } |
@@ -245,4 +252,29 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio |
245 | 252 | private static String partitionLagMetricName(TopicPartition tp) { |
246 | 253 | return tp + ".records-lag"; |
247 | 254 | } |
| 255 | + |
| 256 | + private Object convert(JsonNode node, TypeInformation<?> info) { |
| 257 | + if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) { |
| 258 | + return node.asBoolean(); |
| 259 | + } else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) { |
| 260 | + return node.asText(); |
| 261 | + } else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) { |
| 262 | + return Date.valueOf(node.asText()); |
| 263 | + } else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) { |
| 264 | + // local zone |
| 265 | + return Time.valueOf(node.asText()); |
| 266 | + } else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) { |
| 267 | + // local zone |
| 268 | + return Timestamp.valueOf(node.asText()); |
| 269 | + } else { |
| 270 | + // for types that were specified without JSON schema |
| 271 | + // e.g. POJOs |
| 272 | + try { |
| 273 | + return objectMapper.treeToValue(node, info.getTypeClass()); |
| 274 | + } catch (JsonProcessingException e) { |
| 275 | + throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node); |
| 276 | + } |
| 277 | + } |
| 278 | + } |
| 279 | + |
248 | 280 | } |
0 commit comments