Skip to content

Commit aae012e

Browse files
committed
kafka11 read nest json field
1 parent 2c498c6 commit aae012e

File tree

4 files changed

+51
-67
lines changed

4 files changed

+51
-67
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -85,49 +85,23 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
8585
fieldRow = fieldRow.trim();
8686

8787
String[] filedInfoArr = fieldRow.split("\\s+");
88-
89-
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
90-
if(isMatcherKey){
91-
continue;
92-
}
93-
9488
if(filedInfoArr.length < 2 ){
9589
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
9690
}
9791

98-
if(filedInfoArr.length > 2 ){
99-
throw new RuntimeException("mapping field can't contain spaces.");
100-
}
101-
102-
String physicalField=null;
103-
104-
if (filedInfoArr[0].contains("(")){
105-
String first=filedInfoArr[0];
106-
int leftIndex=first.indexOf("(");
107-
int rightIndex=first.indexOf(")");
108-
109-
String newFirst=first.substring(0,leftIndex).trim();
110-
filedInfoArr[0]=newFirst;
111-
112-
physicalField=first.substring(leftIndex+1,rightIndex).trim();
113-
}
114-
115-
if (StringUtils.isNotBlank(physicalField)){
116-
tableInfo.addPhysicalMappings(filedInfoArr[0],physicalField);
117-
}else {
118-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
92+
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
93+
if(isMatcherKey){
94+
continue;
11995
}
12096

121-
122-
123-
12497
//Compatible situation may arise in space in the fieldName
12598
String[] filedNameArr = new String[filedInfoArr.length - 1];
12699
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
127100
String fieldName = String.join(" ", filedNameArr);
128101
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
129102
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
130103

104+
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
131105
tableInfo.addField(fieldName);
132106
tableInfo.addFieldClass(fieldClass);
133107
tableInfo.addFieldType(fieldType);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerJsonDeserialization.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,9 @@ public Row deserialize(byte[] message) {
140140

141141
numInResolveRecord.inc();
142142
return row;
143-
} catch (Throwable t) {
143+
} catch (Exception e) {
144144
//add metric of dirty data
145-
LOG.error(t.getMessage());
145+
LOG.error(e.getMessage());
146146
dirtyDataCounter.inc();
147147
return null;
148148
}
@@ -231,27 +231,11 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
231231
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
232232
return Date.valueOf(node.asText());
233233
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
234-
// according to RFC 3339 every full-time must have a timezone;
235-
// until we have full timezone support, we only support UTC;
236-
// users can parse their time as string as a workaround
237-
final String time = node.asText();
238-
if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) {
239-
throw new IllegalStateException(
240-
"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " +
241-
"Format: HH:mm:ss'Z'");
242-
}
243-
return Time.valueOf(time.substring(0, time.length() - 1));
234+
// local zone
235+
return Time.valueOf(node.asText());
244236
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
245-
// according to RFC 3339 every date-time must have a timezone;
246-
// until we have full timezone support, we only support UTC;
247-
// users can parse their time as string as a workaround
248-
final String timestamp = node.asText();
249-
if (timestamp.indexOf('Z') < 0) {
250-
throw new IllegalStateException(
251-
"Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " +
252-
"Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
253-
}
254-
return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' '));
237+
// local zone
238+
return Timestamp.valueOf(node.asText());
255239
} else if (info instanceof ObjectArrayTypeInfo) {
256240
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
257241
} else if (info instanceof BasicArrayTypeInfo) {

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020
package com.dtstack.flink.sql.source.kafka.table;
2121

2222
import com.dtstack.flink.sql.table.AbsSourceParser;
23+
import com.dtstack.flink.sql.table.SourceTableInfo;
2324
import com.dtstack.flink.sql.table.TableInfo;
25+
import com.dtstack.flink.sql.util.ClassUtil;
2426
import com.dtstack.flink.sql.util.MathUtil;
2527

2628
import java.util.Map;
29+
import java.util.regex.Matcher;
30+
import java.util.regex.Pattern;
2731

2832
/**
2933
* Reason:
@@ -35,6 +39,28 @@
3539

3640
public class KafkaSourceParser extends AbsSourceParser {
3741

42+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
43+
44+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
45+
46+
static {
47+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
48+
49+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
50+
}
51+
52+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
53+
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
54+
String physicalField = matcher.group(1);
55+
String fieldType = matcher.group(3);
56+
String mappingField = matcher.group(4);
57+
Class fieldClass=ClassUtil.stringConvertClass(fieldType);
58+
59+
tableInfo.addPhysicalMappings(mappingField,physicalField);
60+
tableInfo.addField(mappingField);
61+
tableInfo.addFieldClass(fieldClass);
62+
tableInfo.addFieldType(fieldType);
63+
}
3864

3965
@Override
4066
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {

pom.xml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@
1010
<url>http://maven.apache.org</url>
1111
<modules>
1212
<module>core</module>
13-
<module>kafka09</module>
14-
<module>kafka10</module>
13+
<!--<module>kafka09</module>-->
14+
<!--<module>kafka10</module>-->
1515
<module>kafka11</module>
16-
<module>mysql</module>
17-
<module>hbase</module>
18-
<module>elasticsearch5</module>
19-
<module>mongo</module>
20-
<module>redis5</module>
21-
<module>launcher</module>
22-
<module>rdb</module>
23-
<module>sqlserver</module>
24-
<module>oracle</module>
25-
<module>cassandra</module>
26-
<module>kafka08</module>
27-
<module>serversocket</module>
28-
<module>console</module>
16+
<!--<module>mysql</module>-->
17+
<!--<module>hbase</module>-->
18+
<!--<module>elasticsearch5</module>-->
19+
<!--<module>mongo</module>-->
20+
<!--<module>redis5</module>-->
21+
<!--<module>launcher</module>-->
22+
<!--<module>rdb</module>-->
23+
<!--<module>sqlserver</module>-->
24+
<!--<module>oracle</module>-->
25+
<!--<module>cassandra</module>-->
26+
<!--<module>kafka08</module>-->
27+
<!--<module>serversocket</module>-->
28+
<!--<module>console</module>-->
2929
</modules>
3030

3131

0 commit comments

Comments
 (0)