Skip to content

Commit bfa4d45

Browse files
committed
Merge branch 'v1.8.0_dev' of ssh://git.dtstack.cn:10022/dtstack/dt-center-flinkStreamSQL into 1.8.0_dev_clickhouse
2 parents 4f2e9a9 + 138a873 commit bfa4d45

File tree

41 files changed

+3844
-63
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3844
-63
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
2.修复yarnPer模式提交失败的异常。
1818

1919
# 已支持
20-
* 源表:kafka 0.9,1.x版本
20+
* 源表:kafka 0.9,1.x及以上版本
2121
* 维表:mysql,SQlServer,oracle,hbase,mongo,redis,cassandra,serversocket
2222
* 结果表:mysql,SQlServer,oracle,hbase,elasticsearch5.x,mongo,redis,cassandra,console
2323

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,30 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
6666
if (idx > 0) {
6767
flag = chars[idx - 1];
6868
}
69-
if(c == delimiter){
69+
if (c == delimiter) {
7070
if (inQuotes) {
7171
b.append(c);
72-
} else if(inSingleQuotes){
72+
} else if (inSingleQuotes) {
7373
b.append(c);
74-
} else if(bracketLeftNum > 0){
74+
} else if (bracketLeftNum > 0) {
7575
b.append(c);
76-
}else {
76+
} else {
7777
tokensList.add(b.toString());
7878
b = new StringBuilder();
7979
}
80-
}else if(c == '\"' && '\\'!=flag && !inSingleQuotes){
80+
} else if (c == '\"' && '\\' != flag && !inSingleQuotes) {
8181
inQuotes = !inQuotes;
8282
b.append(c);
83-
}else if(c == '\'' && '\\'!=flag && !inQuotes){
83+
} else if (c == '\'' && '\\' != flag && !inQuotes) {
8484
inSingleQuotes = !inSingleQuotes;
8585
b.append(c);
86-
}else{
86+
} else if (c == '(' && !inSingleQuotes && !inQuotes) {
87+
bracketLeftNum++;
88+
b.append(c);
89+
} else if (c == ')' && !inSingleQuotes && !inQuotes) {
90+
bracketLeftNum--;
91+
b.append(c);
92+
} else {
8793
b.append(c);
8894
}
8995
idx++;

docs/kafkaSource.md

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# 一、json格式数据源
21
## 1.格式:
32
```
43
数据现在支持json格式{"xx":"bb","cc":"dd"}
@@ -22,7 +21,7 @@ CREATE TABLE tableName(
2221
```
2322

2423
## 2.支持的版本
25-
kafka08,kafka09,kafka10,kafka11
24+
kafka08,kafka09,kafka10,kafka11及以上版本
2625
**kafka读取和写入的版本必须一致,否则会有兼容性错误。**
2726

2827
## 3.表结构定义
@@ -71,6 +70,75 @@ CREATE TABLE MyTable(
7170
sourcedatatype ='json' #可不设置
7271
);
7372
```
73+
## 6.支持嵌套json、数据类型字段解析
74+
75+
嵌套json解析示例
76+
77+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
78+
```
79+
CREATE TABLE MyTable(
80+
name varchar,
81+
obj.channel varchar as channel,
82+
pv INT,
83+
xctime bigint,
84+
CHARACTER_LENGTH(channel) AS timeLeng
85+
)WITH(
86+
type ='kafka09',
87+
bootstrapServers ='172.16.8.198:9092',
88+
zookeeperQuorum ='172.16.8.198:2181/kafka',
89+
offsetReset ='latest',
90+
groupId='nbTest',
91+
topic ='nbTest1,nbTest2,nbTest3',
92+
--- topic ='mqTest.*',
93+
---topicIsPattern='true',
94+
parallelism ='1'
95+
);
96+
```
97+
98+
数组类型字段解析示例
99+
100+
json: {"name":"tom", "obj":{"channel": "root"}, "user": [{"pv": 4}, {"pv": 10}], "xctime":1572932485}
101+
```
102+
CREATE TABLE MyTable(
103+
name varchar,
104+
obj.channel varchar as channel,
105+
user[1].pv INT as pv,
106+
xctime bigint,
107+
CHARACTER_LENGTH(channel) AS timeLeng
108+
)WITH(
109+
type ='kafka09',
110+
bootstrapServers ='172.16.8.198:9092',
111+
zookeeperQuorum ='172.16.8.198:2181/kafka',
112+
offsetReset ='latest',
113+
groupId='nbTest',
114+
topic ='nbTest1,nbTest2,nbTest3',
115+
--- topic ='mqTest.*',
116+
---topicIsPattern='true',
117+
parallelism ='1'
118+
);
119+
```
120+
or
121+
122+
json: {"name":"tom", "obj":{"channel": "root"}, "pv": [4, 7, 10], "xctime":1572932485}
123+
```
124+
CREATE TABLE MyTable(
125+
name varchar,
126+
obj.channel varchar as channel,
127+
pv[1] INT as pv,
128+
xctime bigint,
129+
CHARACTER_LENGTH(channel) AS timeLeng
130+
)WITH(
131+
type ='kafka09',
132+
bootstrapServers ='172.16.8.198:9092',
133+
zookeeperQuorum ='172.16.8.198:2181/kafka',
134+
offsetReset ='latest',
135+
groupId='nbTest',
136+
topic ='nbTest1,nbTest2,nbTest3',
137+
--- topic ='mqTest.*',
138+
---topicIsPattern='true',
139+
parallelism ='1'
140+
);
141+
```
74142
# 二、csv格式数据源
75143
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
76144
## 1.参数:

kafka/kafka-sink/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.kafka</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.kafka</artifactId>
14+
<version>1.0-SNAPSHOT</version>
15+
<name>kafka-sink</name>
16+
<packaging>jar</packaging>
17+
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-json</artifactId>
23+
<version>${flink.version}</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<artifactSet>
41+
<excludes>
42+
<exclude>org.slf4j</exclude>
43+
</excludes>
44+
</artifactSet>
45+
<filters>
46+
<filter>
47+
<artifact>*:*</artifact>
48+
<excludes>
49+
<exclude>META-INF/*.SF</exclude>
50+
<exclude>META-INF/*.DSA</exclude>
51+
<exclude>META-INF/*.RSA</exclude>
52+
</excludes>
53+
</filter>
54+
</filters>
55+
</configuration>
56+
</execution>
57+
</executions>
58+
</plugin>
59+
60+
<plugin>
61+
<artifactId>maven-antrun-plugin</artifactId>
62+
<version>1.2</version>
63+
<executions>
64+
<execution>
65+
<id>copy-resources</id>
66+
<!-- here the phase you need -->
67+
<phase>package</phase>
68+
<goals>
69+
<goal>run</goal>
70+
</goals>
71+
<configuration>
72+
<tasks>
73+
<copy todir="${basedir}/../../plugins/kafkasink">
74+
<fileset dir="target/">
75+
<include name="${project.artifactId}-${project.version}.jar"/>
76+
</fileset>
77+
</copy>
78+
79+
<move file="${basedir}/../../plugins/kafkasink/${project.artifactId}-${project.version}.jar"
80+
tofile="${basedir}/../../plugins/kafkasink/${project.name}-${git.branch}.jar"/>
81+
</tasks>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
89+
</project>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import org.apache.flink.api.common.functions.RuntimeContext;
23+
import org.apache.flink.api.common.serialization.SerializationSchema;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.metrics.Counter;
26+
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
28+
29+
import java.util.Properties;
30+
31+
/**
32+
* @author: chuixue
33+
* @create: 2019-11-05 11:54
34+
* @description:
35+
**/
36+
public class CustomerFlinkKafkaProducer<Row> extends FlinkKafkaProducer<Row> {
37+
38+
CustomerJsonRowSerializationSchema schema;
39+
40+
public CustomerFlinkKafkaProducer(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig) {
41+
super(topicId, serializationSchema, producerConfig);
42+
this.schema = (CustomerJsonRowSerializationSchema) serializationSchema;
43+
}
44+
45+
@Override
46+
public void open(Configuration configuration) {
47+
RuntimeContext ctx = getRuntimeContext();
48+
Counter counter = ctx.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
49+
MeterView meter = ctx.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(counter, 20));
50+
51+
schema.setCounter(counter);
52+
53+
try {
54+
super.open(configuration);
55+
} catch (Exception e) {
56+
throw new RuntimeException("",e);
57+
}
58+
}
59+
60+
}

0 commit comments

Comments
 (0)