Skip to content

Commit 2c498c6

Browse files
committed
add nested json format parsing
1 parent daaad43 commit 2c498c6

File tree

5 files changed

+154
-58
lines changed

5 files changed

+154
-58
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.util.ClassUtil;
2424
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2627
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Maps;
2728

@@ -83,17 +84,43 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
8384
for(String fieldRow : fieldRows){
8485
fieldRow = fieldRow.trim();
8586

86-
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
87+
String[] filedInfoArr = fieldRow.split("\\s+");
8788

89+
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
8890
if(isMatcherKey){
8991
continue;
9092
}
9193

92-
String[] filedInfoArr = fieldRow.split("\\s+");
93-
if(filedInfoArr.length < 2){
94+
if(filedInfoArr.length < 2 ){
9495
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
9596
}
9697

98+
if(filedInfoArr.length > 2 ){
99+
throw new RuntimeException("mapping field can't contain spaces.");
100+
}
101+
102+
String physicalField=null;
103+
104+
if (filedInfoArr[0].contains("(")){
105+
String first=filedInfoArr[0];
106+
int leftIndex=first.indexOf("(");
107+
int rightIndex=first.indexOf(")");
108+
109+
String newFirst=first.substring(0,leftIndex).trim();
110+
filedInfoArr[0]=newFirst;
111+
112+
physicalField=first.substring(leftIndex+1,rightIndex).trim();
113+
}
114+
115+
if (StringUtils.isNotBlank(physicalField)){
116+
tableInfo.addPhysicalMappings(filedInfoArr[0],physicalField);
117+
}else {
118+
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
119+
}
120+
121+
122+
123+
97124
//Compatible situation may arise in space in the fieldName
98125
String[] filedNameArr = new String[filedInfoArr.length - 1];
99126
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23+
import com.google.common.collect.Maps;
2324
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2425

2526
import java.io.Serializable;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
/**
2931
* Reason:
@@ -56,6 +58,9 @@ public abstract class TableInfo implements Serializable {
5658

5759
private final List<Class> fieldClassList = Lists.newArrayList();
5860

61+
/** handling nested data structures **/
62+
private Map<String, String> physicalFields = Maps.newHashMap();
63+
5964
private List<String> primaryKeys;
6065

6166
private Integer parallelism = 1;
@@ -154,7 +159,22 @@ public List<Class> getFieldClassList() {
154159
return fieldClassList;
155160
}
156161

162+
public Map<String, String> getPhysicalFields() {
163+
return physicalFields;
164+
}
157165

166+
public void setPhysicalFields(Map<String, String> physicalFields) {
167+
this.physicalFields = physicalFields;
168+
}
169+
170+
/**
171+
*
172+
* @param key row field
173+
* @param value physical field
174+
*/
175+
public void addPhysicalMappings(String key,String value){
176+
physicalFields.put(key,value);
177+
}
158178

159179
public String getFieldDelimiter() {
160180
return fieldDelimiter;

docs/serverSocketSource.md

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,42 +32,6 @@ CREATE TABLE MyTable(
3232

3333
## 3.Server端样例:
3434
```
35-
String str = "{\"CHANNEL\":\"xc3\",\"pv\":1234567,\"xdate\":\"2018-12-07\",\"xtime\":\"2018-12-15\"};";
35+
String JsonStr = "{\"CHANNEL\":\"xc3\",\"pv\":1234567,\"xdate\":\"2018-12-07\",\"xtime\":\"2018-12-15\"};";
3636
37-
38-
public class TimeServerHandler implements Runnable {
39-
Socket socket;
40-
41-
String str = "{\"CHANNEL\":\"xc3\",\"pv\":1234567,\"xdate\":\"2018-12-07\",\"xtime\":\"2018-12-15\"};";
42-
43-
public TimeServerHandler(Socket socket) {
44-
this.socket = socket;
45-
}
46-
47-
public void run() {
48-
PrintWriter out = null;
49-
try {
50-
out = new PrintWriter(this.socket.getOutputStream(), true);
51-
while (true) {
52-
Thread.sleep(3000);
53-
out.println(str);
54-
}
55-
} catch (IOException e) {
56-
e.printStackTrace();
57-
58-
if (out != null) {
59-
out.close();
60-
}
61-
if (socket != null) {
62-
try {
63-
socket.close();
64-
} catch (IOException e1) {
65-
e1.printStackTrace();
66-
}
67-
}
68-
} catch (InterruptedException e) {
69-
e.printStackTrace();
70-
}
71-
}
72-
}
7337
```

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
9292
if ("json".equalsIgnoreCase(kafka011SourceTableInfo.getSourceDataType())) {
9393
if (topicIsPattern) {
9494
kafkaSrc = new CustomerJsonConsumer(Pattern.compile(topicName),
95-
new CustomerJsonDeserialization(typeInformation), props);
95+
new CustomerJsonDeserialization(typeInformation,sourceTableInfo.getPhysicalFields()), props);
9696
} else {
9797
kafkaSrc = new CustomerJsonConsumer(topicName,
98-
new CustomerJsonDeserialization(typeInformation), props);
98+
new CustomerJsonDeserialization(typeInformation,sourceTableInfo.getPhysicalFields()), props);
9999
}
100100
} else if ("csv".equalsIgnoreCase(kafka011SourceTableInfo.getSourceDataType())) {
101101
if (topicIsPattern) {

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerJsonDeserialization.java

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.google.common.collect.Maps;
27+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
28+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
2629
import org.apache.flink.api.common.typeinfo.TypeInformation;
30+
import org.apache.flink.api.common.typeinfo.Types;
31+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2732
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2833
import org.apache.flink.metrics.MetricGroup;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2935
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3036
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3137
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
@@ -39,7 +45,11 @@
3945

4046
import java.io.IOException;
4147
import java.lang.reflect.Field;
48+
import java.sql.Date;
49+
import java.sql.Time;
50+
import java.sql.Timestamp;
4251
import java.util.Iterator;
52+
import java.util.Map;
4353
import java.util.Set;
4454

4555
import static com.dtstack.flink.sql.metric.MetricConstant.DT_PARTITION_GROUP;
@@ -77,16 +87,22 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7787

7888
private boolean firstMsg = true;
7989

80-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
90+
private Map<String, String> rowAndFieldMapping = Maps.newHashMap();
91+
92+
private Map<String, JsonNode> nodeAndJsonnodeMapping = Maps.newHashMap();
93+
94+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo,Map<String, String> rowAndFieldMapping){
8195
this.typeInfo = typeInfo;
8296

8397
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8498

8599
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
100+
101+
this.rowAndFieldMapping=rowAndFieldMapping;
86102
}
87103

88104
@Override
89-
public Row deserialize(byte[] message) throws IOException {
105+
public Row deserialize(byte[] message) {
90106

91107
if(firstMsg){
92108
try {
@@ -103,7 +119,10 @@ public Row deserialize(byte[] message) throws IOException {
103119
numInBytes.inc(message.length);
104120

105121
JsonNode root = objectMapper.readTree(message);
122+
parseTree(root,"");
123+
106124
Row row = new Row(fieldNames.length);
125+
107126
for (int i = 0; i < fieldNames.length; i++) {
108127
JsonNode node = getIgnoreCase(root, fieldNames[i]);
109128

@@ -115,9 +134,7 @@ public Row deserialize(byte[] message) throws IOException {
115134
row.setField(i, null);
116135
}
117136
} else {
118-
// Read the value as specified type
119-
Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
120-
row.setField(i, value);
137+
row.setField(i, convert(node, fieldTypes[i]));
121138
}
122139
}
123140

@@ -131,22 +148,29 @@ public Row deserialize(byte[] message) throws IOException {
131148
}
132149
}
133150

151+
public void parseTree(JsonNode jsonNode, String prefix){
152+
nodeAndJsonnodeMapping.clear();
153+
154+
Iterator<String> iterator = jsonNode.fieldNames();
155+
while (iterator.hasNext()){
156+
String next = iterator.next();
157+
JsonNode child = jsonNode.get(next);
158+
if (child.isObject()){
159+
parseTree(child,next+"."+prefix);
160+
}else {
161+
nodeAndJsonnodeMapping.put(prefix+next,child);
162+
}
163+
}
164+
}
165+
134166
public void setFailOnMissingField(boolean failOnMissingField) {
135167
this.failOnMissingField = failOnMissingField;
136168
}
137169

138-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
139-
140-
Iterator<String> iter = jsonNode.fieldNames();
141-
while (iter.hasNext()) {
142-
String key1 = iter.next();
143-
if (key1.equalsIgnoreCase(key)) {
144-
return jsonNode.get(key1);
145-
}
146-
}
147-
148-
return null;
149170

171+
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
172+
String nodeMappingKey = rowAndFieldMapping.get(key);
173+
return nodeAndJsonnodeMapping.get(nodeMappingKey);
150174
}
151175

152176
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
@@ -192,4 +216,65 @@ protected void registerPtMetric(AbstractFetcher<Row, ?> fetcher) throws Exceptio
192216
private static String partitionLagMetricName(TopicPartition tp) {
193217
return tp + ".records-lag";
194218
}
219+
220+
// --------------------------------------------------------------------------------------------
221+
222+
private Object convert(JsonNode node, TypeInformation<?> info) {
223+
if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
224+
return node.asBoolean();
225+
} else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
226+
return node.asText();
227+
} else if (info.getTypeClass().equals(Types.BIG_DEC.getTypeClass())) {
228+
return node.decimalValue();
229+
} else if (info.getTypeClass().equals(Types.BIG_INT.getTypeClass())) {
230+
return node.bigIntegerValue();
231+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
232+
return Date.valueOf(node.asText());
233+
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
234+
// according to RFC 3339 every full-time must have a timezone;
235+
// until we have full timezone support, we only support UTC;
236+
// users can parse their time as string as a workaround
237+
final String time = node.asText();
238+
if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) {
239+
throw new IllegalStateException(
240+
"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " +
241+
"Format: HH:mm:ss'Z'");
242+
}
243+
return Time.valueOf(time.substring(0, time.length() - 1));
244+
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
245+
// according to RFC 3339 every date-time must have a timezone;
246+
// until we have full timezone support, we only support UTC;
247+
// users can parse their time as string as a workaround
248+
final String timestamp = node.asText();
249+
if (timestamp.indexOf('Z') < 0) {
250+
throw new IllegalStateException(
251+
"Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " +
252+
"Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
253+
}
254+
return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' '));
255+
} else if (info instanceof ObjectArrayTypeInfo) {
256+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
257+
} else if (info instanceof BasicArrayTypeInfo) {
258+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
259+
} else if (info instanceof PrimitiveArrayTypeInfo &&
260+
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
261+
return convertByteArray(node);
262+
} else {
263+
// for types that were specified without JSON schema
264+
// e.g. POJOs
265+
try {
266+
return objectMapper.treeToValue(node, info.getTypeClass());
267+
} catch (JsonProcessingException e) {
268+
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
269+
}
270+
}
271+
}
272+
273+
private Object convertByteArray(JsonNode node) {
274+
try {
275+
return node.binaryValue();
276+
} catch (IOException e) {
277+
throw new RuntimeException("Unable to deserialize byte array.", e);
278+
}
279+
}
195280
}

0 commit comments

Comments
 (0)