Skip to content

Commit 11a840c

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
# Conflicts: # docs/kafkaSource.md # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java
2 parents 4f38b7f + d81c58d commit 11a840c

File tree

7 files changed

+163
-54
lines changed

7 files changed

+163
-54
lines changed

docs/kafkaSource.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# 一、json格式数据源
21
## 1.格式:
32
```
43
数据现在支持json格式{"xx":"bb","cc":"dd"}
@@ -71,6 +70,75 @@ CREATE TABLE MyTable(
7170
sourcedatatype ='json' #可不设置
7271
);
7372
```
73+
## 6.支持嵌套json、数据类型字段解析
74+
75+
嵌套json解析示例
76+
77+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
78+
```
79+
CREATE TABLE MyTable(
80+
name varchar,
81+
obj.channel varchar as channel,
82+
pv INT,
83+
xctime bigint,
84+
CHARACTER_LENGTH(channel) AS timeLeng
85+
)WITH(
86+
type ='kafka09',
87+
bootstrapServers ='172.16.8.198:9092',
88+
zookeeperQuorum ='172.16.8.198:2181/kafka',
89+
offsetReset ='latest',
90+
groupId='nbTest',
91+
topic ='nbTest1,nbTest2,nbTest3',
92+
--- topic ='mqTest.*',
93+
---topicIsPattern='true',
94+
parallelism ='1'
95+
);
96+
```
97+
98+
数组类型字段解析示例
99+
100+
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
101+
```
102+
CREATE TABLE MyTable(
103+
name varchar,
104+
obj.channel varchar as channel,
105+
user[1].pv INT as pv,
106+
xctime bigint,
107+
CHARACTER_LENGTH(channel) AS timeLeng
108+
)WITH(
109+
type ='kafka09',
110+
bootstrapServers ='172.16.8.198:9092',
111+
zookeeperQuorum ='172.16.8.198:2181/kafka',
112+
offsetReset ='latest',
113+
groupId='nbTest',
114+
topic ='nbTest1,nbTest2,nbTest3',
115+
--- topic ='mqTest.*',
116+
---topicIsPattern='true',
117+
parallelism ='1'
118+
);
119+
```
120+
or
121+
122+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": [4, 7, 10], "xctime":1572932485}
123+
```
124+
CREATE TABLE MyTable(
125+
name varchar,
126+
obj.channel varchar as channel,
127+
pv[1] INT as pv,
128+
xctime bigint,
129+
CHARACTER_LENGTH(channel) AS timeLeng
130+
)WITH(
131+
type ='kafka09',
132+
bootstrapServers ='172.16.8.198:9092',
133+
zookeeperQuorum ='172.16.8.198:2181/kafka',
134+
offsetReset ='latest',
135+
groupId='nbTest',
136+
topic ='nbTest1,nbTest2,nbTest3',
137+
--- topic ='mqTest.*',
138+
---topicIsPattern='true',
139+
parallelism ='1'
140+
);
141+
```
74142
# 二、csv格式数据源
75143
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
76144
## 1.参数:

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3535
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
36-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
37-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3836
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3937
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
4038
import org.apache.flink.types.Row;
@@ -43,7 +41,7 @@
4341
import org.apache.kafka.common.TopicPartition;
4442
import org.slf4j.Logger;
4543
import org.slf4j.LoggerFactory;
46-
44+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
4745
import java.io.IOException;
4846
import java.lang.reflect.Field;
4947
import java.sql.Date;
@@ -158,25 +156,32 @@ public Row deserialize(byte[] message) throws IOException {
158156
}
159157
}
160158

161-
private JsonNode getIgnoreCase(String key) {
159+
public JsonNode getIgnoreCase(String key) {
162160
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
163-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
164-
if(node == null){
165-
return null;
166-
}
167-
168-
JsonNodeType nodeType = node.getNodeType();
169-
170-
if (nodeType == JsonNodeType.ARRAY){
171-
throw new IllegalStateException("Unsupported type information array .") ;
172-
}
173-
174-
return node;
161+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
175162
}
176163

177164

178165
private void parseTree(JsonNode jsonNode, String prefix){
179166

167+
if (jsonNode.isArray()) {
168+
ArrayNode array = (ArrayNode) jsonNode;
169+
for (int i = 0; i < array.size(); i++) {
170+
JsonNode child = array.get(i);
171+
String nodeKey = getNodeKey(prefix, i);
172+
173+
if (child.isValueNode()) {
174+
nodeAndJsonNodeMapping.put(nodeKey, child);
175+
} else {
176+
if (rowAndFieldMapping.containsValue(nodeKey)) {
177+
nodeAndJsonNodeMapping.put(nodeKey, child);
178+
}
179+
parseTree(child, nodeKey);
180+
}
181+
}
182+
return;
183+
}
184+
180185
Iterator<String> iterator = jsonNode.fieldNames();
181186
while (iterator.hasNext()){
182187
String next = iterator.next();
@@ -186,7 +191,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
186191
if (child.isValueNode()){
187192
nodeAndJsonNodeMapping.put(nodeKey, child);
188193
} else if(child.isArray()){
189-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
194+
parseTree(child, nodeKey);
190195
}else {
191196
parseTree(child, nodeKey);
192197
}
@@ -201,6 +206,14 @@ private String getNodeKey(String prefix, String nodeName){
201206
return prefix + "." + nodeName;
202207
}
203208

209+
private String getNodeKey(String prefix, int i) {
210+
if (Strings.isNullOrEmpty(prefix)) {
211+
return "[" + i + "]";
212+
}
213+
214+
return prefix + "[" + i + "]";
215+
}
216+
204217
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
205218
this.fetcher = fetcher;
206219
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3535
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
36-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
37-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
36+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3837
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3938
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
4039
import org.apache.flink.types.Row;
@@ -161,34 +160,41 @@ public Row deserialize(byte[] message) throws IOException {
161160

162161
public JsonNode getIgnoreCase(String key) {
163162
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
164-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
165163

166-
if(node == null){
167-
return null;
168-
}
169-
170-
JsonNodeType nodeType = node.getNodeType();
171-
172-
if (nodeType==JsonNodeType.ARRAY){
173-
throw new IllegalStateException("Unsupported type information array .") ;
174-
}
175-
176-
return node;
164+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
177165
}
178166

179167
private void parseTree(JsonNode jsonNode, String prefix){
180168

169+
if (jsonNode.isArray()) {
170+
ArrayNode array = (ArrayNode) jsonNode;
171+
for (int i = 0; i < array.size(); i++) {
172+
JsonNode child = array.get(i);
173+
String nodeKey = getNodeKey(prefix, i);
174+
175+
if (child.isValueNode()) {
176+
nodeAndJsonNodeMapping.put(nodeKey, child);
177+
} else {
178+
if (rowAndFieldMapping.containsValue(nodeKey)) {
179+
nodeAndJsonNodeMapping.put(nodeKey, child);
180+
}
181+
parseTree(child, nodeKey);
182+
}
183+
}
184+
return;
185+
}
186+
181187
Iterator<String> iterator = jsonNode.fieldNames();
182188
while (iterator.hasNext()){
183189
String next = iterator.next();
184190
JsonNode child = jsonNode.get(next);
185191
String nodeKey = getNodeKey(prefix, next);
186192

187-
if (child.isValueNode()){
193+
if (child.isValueNode()) {
188194
nodeAndJsonNodeMapping.put(nodeKey, child);
189-
}else if(child.isArray()){
190-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
191-
}else {
195+
} else if(child.isArray()){
196+
parseTree(child, nodeKey);
197+
} else {
192198
parseTree(child, nodeKey);
193199
}
194200
}
@@ -202,6 +208,14 @@ private String getNodeKey(String prefix, String nodeName){
202208
return prefix + "." + nodeName;
203209
}
204210

211+
private String getNodeKey(String prefix, int i) {
212+
if (Strings.isNullOrEmpty(prefix)) {
213+
return "[" + i + "]";
214+
}
215+
216+
return prefix + "[" + i + "]";
217+
}
218+
205219
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
206220
this.fetcher = fetcher;
207221
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
35+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3536
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
36-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3737
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3838
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3939
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
@@ -45,7 +45,6 @@
4545
import org.slf4j.LoggerFactory;
4646

4747
import java.io.IOException;
48-
import java.lang.reflect.Array;
4948
import java.lang.reflect.Field;
5049
import java.sql.Date;
5150
import java.sql.Time;
@@ -165,22 +164,29 @@ public Row deserialize(byte[] message) throws IOException {
165164

166165
public JsonNode getIgnoreCase(String key) {
167166
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
168-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
169-
if(node == null){
170-
return null;
171-
}
172-
173-
JsonNodeType nodeType = node.getNodeType();
174-
175-
if (nodeType==JsonNodeType.ARRAY){
176-
throw new IllegalStateException("Unsupported type information array .") ;
177-
}
178-
179-
return node;
167+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
180168
}
181169

182170
private void parseTree(JsonNode jsonNode, String prefix){
183171

172+
if (jsonNode.isArray()) {
173+
ArrayNode array = (ArrayNode) jsonNode;
174+
for (int i = 0; i < array.size(); i++) {
175+
JsonNode child = array.get(i);
176+
String nodeKey = getNodeKey(prefix, i);
177+
178+
if (child.isValueNode()) {
179+
nodeAndJsonNodeMapping.put(nodeKey, child);
180+
} else {
181+
if (rowAndFieldMapping.containsValue(nodeKey)) {
182+
nodeAndJsonNodeMapping.put(nodeKey, child);
183+
}
184+
parseTree(child, nodeKey);
185+
}
186+
}
187+
return;
188+
}
189+
184190
Iterator<String> iterator = jsonNode.fieldNames();
185191
while (iterator.hasNext()){
186192
String next = iterator.next();
@@ -190,7 +196,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
190196
if (child.isValueNode()){
191197
nodeAndJsonNodeMapping.put(nodeKey, child);
192198
}else if(child.isArray()){
193-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
199+
parseTree(child, nodeKey);
194200
}else {
195201
parseTree(child, nodeKey);
196202
}
@@ -205,6 +211,14 @@ private String getNodeKey(String prefix, String nodeName){
205211
return prefix + "." + nodeName;
206212
}
207213

214+
private String getNodeKey(String prefix, int i) {
215+
if (Strings.isNullOrEmpty(prefix)) {
216+
return "[" + i + "]";
217+
}
218+
219+
return prefix + "[" + i + "]";
220+
}
221+
208222
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
209223
this.fetcher = fetcher;
210224
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

0 commit comments

Comments
 (0)