Skip to content

Commit bb1aa1b

Browse files
committed
Merge branch 'v1.5.0_dev' of ssh://git.dtstack.cn:10022/dtstack/dt-center-flinkStreamSQL into 1.5_dev_split_str
2 parents a93d7e0 + d81c58d commit bb1aa1b

File tree

7 files changed

+167
-54
lines changed

7 files changed

+167
-54
lines changed

docs/kafkaSource.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,73 @@ CREATE TABLE MyTable(
6363
parallelism ='1'
6464
);
6565
```
66+
67+
## 6.支持嵌套json、数据类型字段解析
68+
69+
嵌套json解析示例
70+
71+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
72+
```
73+
CREATE TABLE MyTable(
74+
name varchar,
75+
obj.channel varchar as channel,
76+
pv INT,
77+
xctime bigint,
78+
CHARACTER_LENGTH(channel) AS timeLeng
79+
)WITH(
80+
type ='kafka09',
81+
bootstrapServers ='172.16.8.198:9092',
82+
zookeeperQuorum ='172.16.8.198:2181/kafka',
83+
offsetReset ='latest',
84+
groupId='nbTest',
85+
topic ='nbTest1,nbTest2,nbTest3',
86+
--- topic ='mqTest.*',
87+
---topicIsPattern='true',
88+
parallelism ='1'
89+
);
90+
```
91+
92+
数组类型字段解析示例
93+
94+
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
95+
```
96+
CREATE TABLE MyTable(
97+
name varchar,
98+
obj.channel varchar as channel,
99+
user[1].pv INT as pv,
100+
xctime bigint,
101+
CHARACTER_LENGTH(channel) AS timeLeng
102+
)WITH(
103+
type ='kafka09',
104+
bootstrapServers ='172.16.8.198:9092',
105+
zookeeperQuorum ='172.16.8.198:2181/kafka',
106+
offsetReset ='latest',
107+
groupId='nbTest',
108+
topic ='nbTest1,nbTest2,nbTest3',
109+
--- topic ='mqTest.*',
110+
---topicIsPattern='true',
111+
parallelism ='1'
112+
);
113+
```
114+
or
115+
116+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": [4, 7, 10], "xctime":1572932485}
117+
```
118+
CREATE TABLE MyTable(
119+
name varchar,
120+
obj.channel varchar as channel,
121+
pv[1] INT as pv,
122+
xctime bigint,
123+
CHARACTER_LENGTH(channel) AS timeLeng
124+
)WITH(
125+
type ='kafka09',
126+
bootstrapServers ='172.16.8.198:9092',
127+
zookeeperQuorum ='172.16.8.198:2181/kafka',
128+
offsetReset ='latest',
129+
groupId='nbTest',
130+
topic ='nbTest1,nbTest2,nbTest3',
131+
--- topic ='mqTest.*',
132+
---topicIsPattern='true',
133+
parallelism ='1'
134+
);
135+
```

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
35-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3635
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3736
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3837
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
@@ -42,7 +41,7 @@
4241
import org.apache.kafka.common.TopicPartition;
4342
import org.slf4j.Logger;
4443
import org.slf4j.LoggerFactory;
45-
44+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
4645
import java.io.IOException;
4746
import java.lang.reflect.Field;
4847
import java.sql.Date;
@@ -159,29 +158,37 @@ public Row deserialize(byte[] message) throws IOException {
159158
}
160159
}
161160

162-
public void setFailOnMissingField(boolean failOnMissingField) {
163-
this.failOnMissingField = failOnMissingField;
164-
}
165-
166-
private JsonNode getIgnoreCase(String key) {
161+
public JsonNode getIgnoreCase(String key) {
167162
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
168-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
169-
if(node == null){
170-
return null;
171-
}
172-
173-
JsonNodeType nodeType = node.getNodeType();
174163

175-
if (nodeType == JsonNodeType.ARRAY){
176-
throw new IllegalStateException("Unsupported type information array .") ;
177-
}
164+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
165+
}
178166

179-
return node;
167+
public void setFailOnMissingField(boolean failOnMissingField) {
168+
this.failOnMissingField = failOnMissingField;
180169
}
181170

182171

183172
private void parseTree(JsonNode jsonNode, String prefix){
184173

174+
if (jsonNode.isArray()) {
175+
ArrayNode array = (ArrayNode) jsonNode;
176+
for (int i = 0; i < array.size(); i++) {
177+
JsonNode child = array.get(i);
178+
String nodeKey = getNodeKey(prefix, i);
179+
180+
if (child.isValueNode()) {
181+
nodeAndJsonNodeMapping.put(nodeKey, child);
182+
} else {
183+
if (rowAndFieldMapping.containsValue(nodeKey)) {
184+
nodeAndJsonNodeMapping.put(nodeKey, child);
185+
}
186+
parseTree(child, nodeKey);
187+
}
188+
}
189+
return;
190+
}
191+
185192
Iterator<String> iterator = jsonNode.fieldNames();
186193
while (iterator.hasNext()){
187194
String next = iterator.next();
@@ -191,7 +198,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
191198
if (child.isValueNode()){
192199
nodeAndJsonNodeMapping.put(nodeKey, child);
193200
} else if(child.isArray()){
194-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
201+
parseTree(child, nodeKey);
195202
}else {
196203
parseTree(child, nodeKey);
197204
}
@@ -206,6 +213,14 @@ private String getNodeKey(String prefix, String nodeName){
206213
return prefix + "." + nodeName;
207214
}
208215

216+
private String getNodeKey(String prefix, int i) {
217+
if (Strings.isNullOrEmpty(prefix)) {
218+
return "[" + i + "]";
219+
}
220+
221+
return prefix + "[" + i + "]";
222+
}
223+
209224
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
210225
this.fetcher = fetcher;
211226
}

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4545

4646
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4747

48-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
48+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4949

5050
static {
5151
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
35-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
36-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
35+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3736
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3837
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3938
import org.apache.flink.types.Row;
@@ -161,19 +160,8 @@ public Row deserialize(byte[] message) throws IOException {
161160

162161
public JsonNode getIgnoreCase(String key) {
163162
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
164-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
165163

166-
if(node == null){
167-
return null;
168-
}
169-
170-
JsonNodeType nodeType = node.getNodeType();
171-
172-
if (nodeType==JsonNodeType.ARRAY){
173-
throw new IllegalStateException("Unsupported type information array .") ;
174-
}
175-
176-
return node;
164+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
177165
}
178166

179167

@@ -183,17 +171,35 @@ public void setFailOnMissingField(boolean failOnMissingField) {
183171

184172
private void parseTree(JsonNode jsonNode, String prefix){
185173

174+
if (jsonNode.isArray()) {
175+
ArrayNode array = (ArrayNode) jsonNode;
176+
for (int i = 0; i < array.size(); i++) {
177+
JsonNode child = array.get(i);
178+
String nodeKey = getNodeKey(prefix, i);
179+
180+
if (child.isValueNode()) {
181+
nodeAndJsonNodeMapping.put(nodeKey, child);
182+
} else {
183+
if (rowAndFieldMapping.containsValue(nodeKey)) {
184+
nodeAndJsonNodeMapping.put(nodeKey, child);
185+
}
186+
parseTree(child, nodeKey);
187+
}
188+
}
189+
return;
190+
}
191+
186192
Iterator<String> iterator = jsonNode.fieldNames();
187193
while (iterator.hasNext()){
188194
String next = iterator.next();
189195
JsonNode child = jsonNode.get(next);
190196
String nodeKey = getNodeKey(prefix, next);
191197

192-
if (child.isValueNode()){
198+
if (child.isValueNode()) {
193199
nodeAndJsonNodeMapping.put(nodeKey, child);
194-
}else if(child.isArray()){
195-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
196-
}else {
200+
} else if(child.isArray()){
201+
parseTree(child, nodeKey);
202+
} else {
197203
parseTree(child, nodeKey);
198204
}
199205
}
@@ -207,6 +213,14 @@ private String getNodeKey(String prefix, String nodeName){
207213
return prefix + "." + nodeName;
208214
}
209215

216+
private String getNodeKey(String prefix, int i) {
217+
if (Strings.isNullOrEmpty(prefix)) {
218+
return "[" + i + "]";
219+
}
220+
221+
return prefix + "[" + i + "]";
222+
}
223+
210224
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
211225
this.fetcher = fetcher;
212226
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3435
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
35-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3636
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3737
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3838
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
@@ -44,7 +44,6 @@
4444
import org.slf4j.LoggerFactory;
4545

4646
import java.io.IOException;
47-
import java.lang.reflect.Array;
4847
import java.lang.reflect.Field;
4948
import java.sql.Date;
5049
import java.sql.Time;
@@ -166,18 +165,7 @@ public Row deserialize(byte[] message) throws IOException {
166165

167166
public JsonNode getIgnoreCase(String key) {
168167
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
169-
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
170-
if(node == null){
171-
return null;
172-
}
173-
174-
JsonNodeType nodeType = node.getNodeType();
175-
176-
if (nodeType==JsonNodeType.ARRAY){
177-
throw new IllegalStateException("Unsupported type information array .") ;
178-
}
179-
180-
return node;
168+
return nodeAndJsonNodeMapping.get(nodeMappingKey);
181169
}
182170

183171
public void setFailOnMissingField(boolean failOnMissingField) {
@@ -186,6 +174,24 @@ public void setFailOnMissingField(boolean failOnMissingField) {
186174

187175
private void parseTree(JsonNode jsonNode, String prefix){
188176

177+
if (jsonNode.isArray()) {
178+
ArrayNode array = (ArrayNode) jsonNode;
179+
for (int i = 0; i < array.size(); i++) {
180+
JsonNode child = array.get(i);
181+
String nodeKey = getNodeKey(prefix, i);
182+
183+
if (child.isValueNode()) {
184+
nodeAndJsonNodeMapping.put(nodeKey, child);
185+
} else {
186+
if (rowAndFieldMapping.containsValue(nodeKey)) {
187+
nodeAndJsonNodeMapping.put(nodeKey, child);
188+
}
189+
parseTree(child, nodeKey);
190+
}
191+
}
192+
return;
193+
}
194+
189195
Iterator<String> iterator = jsonNode.fieldNames();
190196
while (iterator.hasNext()){
191197
String next = iterator.next();
@@ -195,7 +201,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
195201
if (child.isValueNode()){
196202
nodeAndJsonNodeMapping.put(nodeKey, child);
197203
}else if(child.isArray()){
198-
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
204+
parseTree(child, nodeKey);
199205
}else {
200206
parseTree(child, nodeKey);
201207
}
@@ -210,6 +216,14 @@ private String getNodeKey(String prefix, String nodeName){
210216
return prefix + "." + nodeName;
211217
}
212218

219+
private String getNodeKey(String prefix, int i) {
220+
if (Strings.isNullOrEmpty(prefix)) {
221+
return "[" + i + "]";
222+
}
223+
224+
return prefix + "[" + i + "]";
225+
}
226+
213227
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
214228
this.fetcher = fetcher;
215229
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);

0 commit comments

Comments
 (0)