Skip to content

Commit 138a873

Browse files
committed
Merge branch 'v1.8.0_dev_kafkaNew' into 'v1.8.0_dev'
最新kafka版本支持嵌套json和数组类型字段解析 最新kafka版本支持嵌套json和数组字段解析 See merge request !112
2 parents 79cd993 + 21b0cf6 commit 138a873

File tree

4 files changed

+30
-16
lines changed

4 files changed

+30
-16
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
2.修复yarnPer模式提交失败的异常。
1818

1919
# 已支持
20-
* 源表:kafka 0.9,1.x版本
20+
* 源表:kafka 0.9,1.x及以上版本
2121
* 维表:mysql,SQlServer,oracle,hbase,mongo,redis,cassandra,serversocket
2222
* 结果表:mysql,SQlServer,oracle,hbase,elasticsearch5.x,mongo,redis,cassandra,console
2323

docs/kafkaSource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ CREATE TABLE tableName(
2121
```
2222

2323
## 2.支持的版本
24-
kafka08,kafka09,kafka10,kafka11
24+
kafka08,kafka09,kafka10,kafka11及以上版本
2525
**kafka读取和写入的版本必须一致,否则会有兼容性错误。**
2626

2727
## 3.表结构定义

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3334
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3435
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3536
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
@@ -157,21 +158,27 @@ public Row deserialize(byte[] message) throws IOException {
157158

158159
public JsonNode getIgnoreCase(String key) {
159160
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
160-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
161-
if(node == null){
162-
return null;
163-
}
164-
165-
JsonNodeType nodeType = node.getNodeType();
166-
167-
if (nodeType==JsonNodeType.ARRAY){
168-
throw new IllegalStateException("Unsupported type information array .") ;
169-
}
170-
171-
return node;
161+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
172162
}
173163

174164
private void parseTree(JsonNode jsonNode, String prefix){
165+
if (jsonNode.isArray()) {
166+
ArrayNode array = (ArrayNode) jsonNode;
167+
for (int i = 0; i < array.size(); i++) {
168+
JsonNode child = array.get(i);
169+
String nodeKey = getNodeKey(prefix, i);
170+
171+
if (child.isValueNode()) {
172+
nodeAndJsonNodeMapping.put(nodeKey, child);
173+
} else {
174+
if (rowAndFieldMapping.containsValue(nodeKey)) {
175+
nodeAndJsonNodeMapping.put(nodeKey, child);
176+
}
177+
parseTree(child, nodeKey);
178+
}
179+
}
180+
return;
181+
}
175182

176183
Iterator<String> iterator = jsonNode.fieldNames();
177184
while (iterator.hasNext()){
@@ -182,7 +189,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
182189
if (child.isValueNode()){
183190
nodeAndJsonNodeMapping.put(nodeKey, child);
184191
}else if(child.isArray()){
185-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
192+
parseTree(child, nodeKey);
186193
}else {
187194
parseTree(child, nodeKey);
188195
}
@@ -197,6 +204,13 @@ private String getNodeKey(String prefix, String nodeName){
197204
return prefix + "." + nodeName;
198205
}
199206

207+
private String getNodeKey(String prefix, int i) {
208+
if (Strings.isNullOrEmpty(prefix)) {
209+
return "[" + i + "]";
210+
}
211+
return prefix + "[" + i + "]";
212+
}
213+
200214
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
201215
this.fetcher = fetcher;
202216
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4040

4141
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4242

43-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
43+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4444

4545
static {
4646
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

0 commit comments

Comments
 (0)