Skip to content

Commit b37b94f

Browse files
吹雪吹雪
authored andcommitted
流计算设置kafka自定义参数无效
1 parent a8364e9 commit b37b94f

File tree

14 files changed

+345
-34
lines changed

14 files changed

+345
-34
lines changed

core/dependency-reduced-pom.xml

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<parent>
4+
<artifactId>flink.sql</artifactId>
5+
<groupId>com.dtstack.flink</groupId>
6+
<version>1.0-SNAPSHOT</version>
7+
</parent>
8+
<modelVersion>4.0.0</modelVersion>
9+
<artifactId>sql.core</artifactId>
10+
<name>core</name>
11+
<url>http://maven.apache.org</url>
12+
<build>
13+
<resources>
14+
<resource>
15+
<directory>src/main/java/</directory>
16+
</resource>
17+
<resource>
18+
<directory>src/main/resources/</directory>
19+
</resource>
20+
</resources>
21+
<plugins>
22+
<plugin>
23+
<artifactId>maven-shade-plugin</artifactId>
24+
<version>3.1.0</version>
25+
<executions>
26+
<execution>
27+
<phase>package</phase>
28+
<goals>
29+
<goal>shade</goal>
30+
</goals>
31+
<configuration>
32+
<transformers>
33+
<transformer>
34+
<mainClass>com.dtstack.flink.sql.Main</mainClass>
35+
</transformer>
36+
</transformers>
37+
<artifactSet>
38+
<includes>
39+
<include>com.fasterxml.jackson.*</include>
40+
<include>com.google.guava</include>
41+
</includes>
42+
</artifactSet>
43+
<filters>
44+
<filter>
45+
<artifact>*:*</artifact>
46+
<excludes>
47+
<exclude>META-INF/*.SF</exclude>
48+
<exclude>META-INF/*.DSA</exclude>
49+
<exclude>META-INF/*.RSA</exclude>
50+
</excludes>
51+
</filter>
52+
</filters>
53+
</configuration>
54+
</execution>
55+
</executions>
56+
</plugin>
57+
<plugin>
58+
<artifactId>maven-antrun-plugin</artifactId>
59+
<version>1.2</version>
60+
<executions>
61+
<execution>
62+
<id>copy-resources</id>
63+
<phase>package</phase>
64+
<goals>
65+
<goal>run</goal>
66+
</goals>
67+
<configuration>
68+
<tasks>
69+
<copy>
70+
<fileset>
71+
<include />
72+
</fileset>
73+
</copy>
74+
<move />
75+
</tasks>
76+
</configuration>
77+
</execution>
78+
</executions>
79+
</plugin>
80+
</plugins>
81+
</build>
82+
<dependencies>
83+
<dependency>
84+
<groupId>junit</groupId>
85+
<artifactId>junit</artifactId>
86+
<version>4.12</version>
87+
<scope>test</scope>
88+
<exclusions>
89+
<exclusion>
90+
<artifactId>hamcrest-core</artifactId>
91+
<groupId>org.hamcrest</groupId>
92+
</exclusion>
93+
</exclusions>
94+
</dependency>
95+
<dependency>
96+
<groupId>joda-time</groupId>
97+
<artifactId>joda-time</artifactId>
98+
<version>2.5</version>
99+
<scope>compile</scope>
100+
</dependency>
101+
<dependency>
102+
<groupId>org.apache.flink</groupId>
103+
<artifactId>flink-core</artifactId>
104+
<version>1.8.1</version>
105+
<scope>compile</scope>
106+
</dependency>
107+
<dependency>
108+
<groupId>org.apache.flink</groupId>
109+
<artifactId>flink-streaming-java_2.11</artifactId>
110+
<version>1.8.1</version>
111+
<scope>compile</scope>
112+
</dependency>
113+
<dependency>
114+
<groupId>org.apache.flink</groupId>
115+
<artifactId>flink-streaming-scala_2.11</artifactId>
116+
<version>1.8.1</version>
117+
<scope>compile</scope>
118+
</dependency>
119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-table-planner_2.11</artifactId>
122+
<version>1.8.1</version>
123+
<scope>compile</scope>
124+
</dependency>
125+
<dependency>
126+
<groupId>org.apache.flink</groupId>
127+
<artifactId>flink-table-common</artifactId>
128+
<version>1.8.1</version>
129+
<scope>compile</scope>
130+
</dependency>
131+
<dependency>
132+
<groupId>org.apache.calcite</groupId>
133+
<artifactId>calcite-server</artifactId>
134+
<version>1.16.0</version>
135+
<scope>compile</scope>
136+
<exclusions>
137+
<exclusion>
138+
<artifactId>jackson-databind</artifactId>
139+
<groupId>com.fasterxml.jackson.core</groupId>
140+
</exclusion>
141+
</exclusions>
142+
</dependency>
143+
<dependency>
144+
<groupId>org.apache.flink</groupId>
145+
<artifactId>flink-cep-scala_2.11</artifactId>
146+
<version>1.8.1</version>
147+
<scope>compile</scope>
148+
</dependency>
149+
<dependency>
150+
<groupId>org.apache.flink</groupId>
151+
<artifactId>flink-scala_2.11</artifactId>
152+
<version>1.8.1</version>
153+
<scope>compile</scope>
154+
</dependency>
155+
<dependency>
156+
<groupId>org.apache.flink</groupId>
157+
<artifactId>flink-yarn_2.11</artifactId>
158+
<version>1.8.1</version>
159+
<scope>compile</scope>
160+
</dependency>
161+
<dependency>
162+
<groupId>org.apache.flink</groupId>
163+
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
164+
<version>1.8.1</version>
165+
<scope>compile</scope>
166+
</dependency>
167+
</dependencies>
168+
<properties>
169+
<guava.version>19.0</guava.version>
170+
<project.package.name>core</project.package.name>
171+
<jackson.version>2.7.9</jackson.version>
172+
<calcite.server.version>1.16.0</calcite.server.version>
173+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
174+
</properties>
175+
</project>
176+

docs/kafkaSource.md

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ CREATE TABLE tableName(
99
WATERMARK FOR colName AS withOffset( colName , delayTime )
1010
)WITH(
1111
type ='kafka09',
12-
kafka.bootstrap.servers ='ip:port,ip:port...',
13-
kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',
14-
kafka.auto.offset.reset ='latest',
15-
kafka.topic ='topicName',
12+
bootstrapServers ='ip:port,ip:port...',
13+
zookeeperQuorum ='ip:port,ip:port/zkparent',
14+
offsetReset ='latest',
15+
topic ='topicName',
16+
groupId='test',
1617
parallelism ='parllNum',
1718
--timezone='America/Los_Angeles',
1819
timezone='Asia/Shanghai',
@@ -39,16 +40,45 @@ CREATE TABLE tableName(
3940
|参数名称|含义|是否必填|默认值|
4041
|----|---|---|---|
4142
|type | kafka09 ||kafka08、kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本)|
42-
|kafka.group.id | 需要读取的 groupId 名称|||
43-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
44-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
45-
|kafka.topic | 需要读取的 topic 名称|||
46-
|patterntopic | topic是否是正则表达式格式(true&#124;false) |否| false
47-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
43+
|groupId | 需要读取的 groupId 名称|||
44+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
45+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
46+
|topic | 需要读取的 topic 名称|||
47+
|topicIsPattern | topic是否是正则表达式格式(true&#124;false) |否| false
48+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4849
|parallelism | 并行度设置||1|
4950
|sourcedatatype | 数据类型||json|
5051
|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai'
5152
**kafka相关参数可以自定义,使用kafka.开头即可。**
53+
```
54+
kafka.consumer.id
55+
kafka.socket.timeout.ms
56+
kafka.fetch.message.max.bytes
57+
kafka.num.consumer.fetchers
58+
kafka.auto.commit.enable
59+
kafka.auto.commit.interval.ms
60+
kafka.queued.max.message.chunks
61+
kafka.rebalance.max.retries
62+
kafka.fetch.min.bytes
63+
kafka.fetch.wait.max.ms
64+
kafka.rebalance.backoff.ms
65+
kafka.refresh.leader.backoff.ms
66+
kafka.consumer.timeout.ms
67+
kafka.exclude.internal.topics
68+
kafka.partition.assignment.strategy
69+
kafka.client.id
70+
kafka.zookeeper.session.timeout.ms
71+
kafka.zookeeper.connection.timeout.ms
72+
kafka.zookeeper.sync.time.ms
73+
kafka.offsets.storage
74+
kafka.offsets.channel.backoff.ms
75+
kafka.offsets.channel.socket.timeout.ms
76+
kafka.offsets.commit.max.retries
77+
kafka.dual.commit.enabled
78+
kafka.partition.assignment.strategy
79+
kafka.socket.receive.buffer.bytes
80+
kafka.fetch.min.bytes
81+
```
5282

5383
## 5.样例:
5484
```
@@ -60,12 +90,12 @@ CREATE TABLE MyTable(
6090
CHARACTER_LENGTH(channel) AS timeLeng
6191
)WITH(
6292
type ='kafka09',
63-
kafka.bootstrap.servers ='172.16.8.198:9092',
64-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
65-
kafka.auto.offset.reset ='latest',
66-
kafka.topic ='nbTest1,nbTest2,nbTest3',
67-
--kafka.topic ='mqTest.*',
68-
--patterntopic='true'
93+
bootstrapServers ='172.16.8.198:9092',
94+
zookeeperQuorum ='172.16.8.198:2181/kafka',
95+
offsetReset ='latest',
96+
topic ='nbTest1,nbTest2,nbTest3',
97+
--topic ='mqTest.*',
98+
--topicIsPattern='true'
6999
parallelism ='1',
70100
sourcedatatype ='json' #可不设置
71101
);
@@ -146,10 +176,10 @@ CREATE TABLE MyTable(
146176
|参数名称|含义|是否必填|默认值|
147177
|----|---|---|---|
148178
|type | kafka09 |||
149-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
150-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
151-
|kafka.topic | 需要读取的 topic 名称|||
152-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
179+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
180+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
181+
|topic | 需要读取的 topic 名称|||
182+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
153183
|parallelism | 并行度设置 ||1|
154184
|sourcedatatype | 数据类型||csv|
155185
|fielddelimiter | 字段分隔符|||
@@ -166,12 +196,12 @@ CREATE TABLE MyTable(
166196
CHARACTER_LENGTH(channel) AS timeLeng
167197
)WITH(
168198
type ='kafka09',
169-
kafka.bootstrap.servers ='172.16.8.198:9092',
170-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
171-
kafka.auto.offset.reset ='latest',
172-
kafka.topic ='nbTest1',
173-
--kafka.topic ='mqTest.*',
174-
--kafka.topicIsPattern='true'
199+
bootstrapServers ='172.16.8.198:9092',
200+
zookeeperQuorum ='172.16.8.198:2181/kafka',
201+
offsetReset ='latest',
202+
topic ='nbTest1',
203+
--topic ='mqTest.*',
204+
--topicIsPattern='true'
175205
parallelism ='1',
176206
sourcedatatype ='csv',
177207
fielddelimiter ='\|',
@@ -192,10 +222,10 @@ create table kafka_stream(
192222
_offset BIGINT,
193223
) with (
194224
type ='kafka09',
195-
kafka.bootstrap.servers ='172.16.8.198:9092',
196-
kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',
197-
kafka.auto.offset.reset ='latest',
198-
kafka.topic ='nbTest1',
225+
bootstrapServers ='172.16.8.198:9092',
226+
zookeeperQuorum ='172.16.8.198:2181/kafka',
227+
offsetReset ='latest',
228+
topic ='nbTest1',
199229
parallelism ='1',
200230
sourcedatatype='text'
201231
@@ -205,10 +235,10 @@ create table kafka_stream(
205235
|参数名称|含义|是否必填|默认值|
206236
|----|---|---|---|
207237
|type | kafka09 |||
208-
|kafka.bootstrap.servers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
209-
|kafka.zookeeper.quorum | kafka zk地址信息(多个之间用逗号分隔)|||
210-
|kafka.topic | 需要读取的 topic 名称|||
211-
|kafka.auto.offset.reset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
238+
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
239+
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
240+
|topic | 需要读取的 topic 名称|||
241+
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
212242
|parallelism | 并行度设置||1|
213243
|sourcedatatype | 数据类型||text|
214244
**kafka相关参数可以自定义,使用kafka.开头即可。**

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6464
String topicName = kafkaSourceTableInfo.getTopic();
6565

6666
Properties props = new Properties();
67+
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
68+
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
69+
}
6770
props.setProperty("bootstrap.servers", kafkaSourceTableInfo.getBootstrapServers());
6871
if (DtStringUtil.isJosn(kafkaSourceTableInfo.getOffsetReset())) {
6972
props.setProperty("auto.offset.reset", "none");

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9191
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));
9292
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase())));
9393
kafkaSourceTableInfo.setTimeZone(MathUtil.getString(props.get(KafkaSourceTableInfo.TIME_ZONE_KEY.toLowerCase())));
94+
for (String key : props.keySet()) {
95+
if (!key.isEmpty() && key.startsWith("kafka.")) {
96+
kafkaSourceTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
97+
}
98+
}
9499
kafkaSourceTableInfo.check();
95100
return kafkaSourceTableInfo;
96101
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.dtstack.flink.sql.table.SourceTableInfo;
2222
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
2323

24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Set;
27+
2428
/**
2529
* @author: chuixue
2630
* @create: 2019-11-05 11:09
@@ -113,6 +117,20 @@ public void setOffset(String offset) {
113117
this.offset = offset;
114118
}
115119

120+
public Map<String, String> kafkaParam = new HashMap<>();
121+
122+
public void addKafkaParam(String key, String value) {
123+
kafkaParam.put(key, value);
124+
}
125+
126+
public String getKafkaParam(String key) {
127+
return kafkaParam.get(key);
128+
}
129+
130+
public Set<String> getKafkaParamKeys() {
131+
return kafkaParam.keySet();
132+
}
133+
116134
@Override
117135
public boolean check() {
118136
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6767
String topicName = kafka09SourceTableInfo.getTopic();
6868

6969
Properties props = new Properties();
70+
for (String key : kafka09SourceTableInfo.getKafkaParamKeys()) {
71+
props.setProperty(key, kafka09SourceTableInfo.getKafkaParam(key));
72+
}
7073
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
7174
if (DtStringUtil.isJosn(kafka09SourceTableInfo.getOffsetReset())){
7275
props.setProperty("auto.offset.reset", "none");

0 commit comments

Comments
 (0)