Skip to content

Commit 632eda8

Browse files
committed
kafka parse timestamp
1 parent 66082b1 commit 632eda8

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.common.typeinfo.Types;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3234
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3335
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,6 +45,9 @@
4345

4446
import java.io.IOException;
4547
import java.lang.reflect.Field;
48+
import java.sql.Date;
49+
import java.sql.Time;
50+
import java.sql.Timestamp;
4651
import java.util.Iterator;
4752
import java.util.Map;
4853
import java.util.Set;
@@ -134,17 +139,18 @@ public Row deserialize(byte[] message) throws IOException {
134139
}
135140
} else {
136141
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
142+
Object value = convert(node, fieldTypes[i]);
138143
row.setField(i, value);
139144
}
140145
}
141146

142147
numInResolveRecord.inc();
143148
return row;
144-
} catch (Throwable t) {
149+
} catch (Exception e) {
145150
//add metric of dirty data
146151
if (dirtyDataCounter.getCount()%rowLenth == 0){
147152
LOG.info("dirtyData: " + new String(message));
153+
LOG.info(" " ,e);
148154
}
149155
dirtyDataCounter.inc();
150156
return null;
@@ -243,4 +249,31 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
243249
private static String partitionLagMetricName(TopicPartition tp) {
244250
return tp + ".records-lag";
245251
}
252+
253+
private Object convert(JsonNode node, TypeInformation<?> info) {
254+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
255+
return node.asBoolean();
256+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
257+
return node.asText();
258+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
259+
return Date.valueOf(node.asText());
260+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
261+
// local zone
262+
return Time.valueOf(node.asText());
263+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
264+
// local zone
265+
return Timestamp.valueOf(node.asText());
266+
} else {
267+
// for types that were specified without JSON schema
268+
// e.g. POJOs
269+
try {
270+
return objectMapper.treeToValue(node, info.getTypeClass());
271+
} catch (JsonProcessingException e) {
272+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
273+
}
274+
}
275+
}
276+
277+
278+
246279
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.common.typeinfo.Types;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2930
import org.apache.flink.metrics.MetricGroup;
3031
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3133
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3234
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3335
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
@@ -43,6 +45,9 @@
4345

4446
import java.io.IOException;
4547
import java.lang.reflect.Field;
48+
import java.sql.Date;
49+
import java.sql.Time;
50+
import java.sql.Timestamp;
4651
import java.util.Iterator;
4752
import java.util.Map;
4853
import java.util.Set;
@@ -134,17 +139,18 @@ public Row deserialize(byte[] message) throws IOException {
134139
}
135140
} else {
136141
// Read the value as specified type
137-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
142+
Object value = convert(node, fieldTypes[i]);
138143
row.setField(i, value);
139144
}
140145
}
141146

142147
numInResolveRecord.inc();
143148
return row;
144-
} catch (Throwable t) {
149+
} catch (Exception e) {
145150
//add metric of dirty data
146151
if (dirtyDataCounter.getCount()%rowLenth == 0){
147152
LOG.info("dirtyData: " + new String(message));
153+
LOG.error(" ", e);
148154
}
149155
dirtyDataCounter.inc();
150156
return null;
@@ -244,4 +250,28 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
244250
private static String partitionLagMetricName(TopicPartition tp) {
245251
return tp + ".records-lag";
246252
}
253+
254+
private Object convert(JsonNode node, TypeInformation<?> info) {
255+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
256+
return node.asBoolean();
257+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
258+
return node.asText();
259+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
260+
return Date.valueOf(node.asText());
261+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
262+
// local zone
263+
return Time.valueOf(node.asText());
264+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
265+
// local zone
266+
return Timestamp.valueOf(node.asText());
267+
} else {
268+
// for types that were specified without JSON schema
269+
// e.g. POJOs
270+
try {
271+
return objectMapper.treeToValue(node, info.getTypeClass());
272+
} catch (JsonProcessingException e) {
273+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
274+
}
275+
}
276+
}
247277
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,11 @@ public Row deserialize(byte[] message) throws IOException {
151151

152152
numInResolveRecord.inc();
153153
return row;
154-
} catch (Throwable t) {
154+
} catch (Exception e) {
155155
//add metric of dirty data
156156
if (dirtyDataCounter.getCount()%rowLenth == 0){
157157
LOG.info("dirtyData: " + new String(message));
158+
LOG.error("" , e);
158159
}
159160
dirtyDataCounter.inc();
160161
return null;

0 commit comments

Comments
 (0)