Skip to content

Commit eaeb781

Browse files
mas-chenmxm
authored andcommitted
[FLINK-32416] initial implementation of DynamicKafkaSource with bounded/unbounded support and unit/integration tests
add dynamic kafka source docs
1 parent 825052f commit eaeb781

File tree

51 files changed

+7891
-13
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+7891
-13
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
---
2+
title: Kafka
3+
weight: 3
4+
type: docs
5+
aliases:
6+
- /zh/dev/connectors/dynamic-kafka.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Dynamic Kafka Source _`Experimental`_
28+
29+
Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading data from and
30+
writing data to Kafka topics from one or more Kafka clusters. This connector achieves this in a dynamic
31+
fashion, without requiring a job restart, using a Kafka metadata service to facilitate changes in
32+
topics and/or clusters. This is especially useful in transparent Kafka cluster addition/removal without
33+
Flink job restart, transparent Kafka topic addition/removal without Flink job restart, and direct integration
34+
with Hybrid Source.
35+
36+
## Dependency
37+
38+
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
39+
40+
{{< connector_artifact flink-connector-kafka 3.1.0 >}}
41+
42+
Flink's streaming connectors are not part of the binary distribution.
43+
See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
44+
45+
## Dynamic Kafka Source
46+
{{< hint info >}}
47+
This part describes the Dynamic Kafka Source based on the new
48+
[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
49+
{{< /hint >}}
50+
51+
## Usage
52+
53+
Dynamic Kafka Source provides a builder class to initialize the DynamicKafkaSource. The code snippet
54+
below shows how to build a DynamicKafkaSource to consume messages from the earliest offset of the
55+
stream "input-stream" and deserialize only the value of the
56+
ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the cluster(s) and topic(s)
57+
corresponding to "input-stream".
58+
59+
{{< tabs "KafkaSource" >}}
60+
{{< tab "Java" >}}
61+
```java
62+
63+
DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
64+
.setKafkaMetadataService(new MyKafkaMetadataService())
65+
.setStreamIds(Collections.singleton("input-stream"))
66+
.setStartingOffsets(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
67+
.setDeserializer(new SimpleStringSchema())
68+
.setProperties(properties)
69+
.build();
70+
71+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
72+
```
73+
{{< /tab >}}
74+
{{< /tabs >}}
75+
76+
### Kafka Metadata Service
77+
78+
An interface is provided to resolve the logical Kafka stream(s) into the corresponding physical
79+
topic(s) and cluster(s). Typically, these implementations are based on services that align well
80+
with internal Kafka infrastructure--if that is not available, an in-memory implementation
81+
would also work. An example of in-memory implementation can be found in our tests.
82+
83+
This source achieves its dynamic characteristic by periodically polling this Kafka metadata service
84+
for any changes to the Kafka stream(s) and reconciling the reader tasks to subscribe to the new
85+
Kafka metadata returned by the service. For example, in the case of a Kafka migration, the source would
86+
swap from one cluster to the new cluster when the service makes that change in the Kafka stream metadata.
87+
88+
### Additional Details
89+
90+
For additional details on deserialization, event time and watermarks, idleness, consumer offset
91+
committing, security, and more, you can refer to the Kafka Source documentation. This is possible because the
92+
Dynamic Kafka Source leverages components of the Kafka Source, and the implementation will be
93+
discussed in the next section.
94+
95+
### Behind the Scene
96+
{{< hint info >}}
97+
If you are interested in how Kafka source works under the design of new data source API, you may
98+
want to read this part as a reference. For details about the new data source API,
99+
[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
100+
<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
101+
provide more descriptive discussions.
102+
{{< /hint >}}
103+
104+
105+
Under the abstraction of the new data source API, Dynamic Kafka Source consists of the following components:
106+
#### Source Split
107+
A source split in Dynamic Kafka Source represents a partition of a Kafka topic, with cluster information. It
108+
consists of:
109+
* A Kafka cluster id that can be resolved by the Kafka metadata service.
110+
* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
111+
112+
You can check the class `DynamicKafkaSourceSplit` for more details.
113+
114+
#### Split Enumerator
115+
116+
This enumerator is responsible for discovering and assigning splits from 1+ cluster. At startup, the
117+
enumerator will discover metadata belonging to the Kafka stream ids. Using the metadata, it can
118+
initialize KafkaSourceEnumerators to handle the functions of assigning splits to the readers. In addition,
119+
source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the
120+
KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators when metadata changes involve
121+
clearing outdated metrics since clusters may be removed and so should their metrics.
122+
123+
#### Source Reader
124+
125+
This reader is responsible for reading from 1+ clusters and using the KafkaSourceReader to fetch
126+
records from topics and clusters based on the metadata. When new metadata is discovered by the enumerator,
127+
the reader will reconcile metadata changes to possibly restart the KafkaSourceReader to read from the new
128+
set of topics and clusters.
129+
130+
#### Kafka Metadata Service
131+
132+
This interface represents the source of truth for the current metadata for the configured Kafka stream ids.
133+
Metadata that is removed in between polls is considered non-active (e.g. removing a cluster from the
134+
return value, means that a cluster is non-active and should not be read from). The cluster metadata
135+
contains an immutable Kafka cluster id, the set of topics, and properties needed to connect to the
136+
Kafka cluster.
137+
138+
#### FLIP 246
139+
140+
To understand more behind the scenes, please read [FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
141+
for more details and discussion.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
---
2+
title: Kafka
3+
weight: 3
4+
type: docs
5+
aliases:
6+
- /dev/connectors/dynamic-kafka.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Dynamic Kafka Source _`Experimental`_
28+
29+
Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading data from and
30+
writing data to Kafka topics from one or more Kafka clusters. This connector achieves this in a dynamic
31+
fashion, without requiring a job restart, using a Kafka metadata service to facilitate changes in
32+
topics and/or clusters. This is especially useful in transparent Kafka cluster addition/removal without
33+
Flink job restart, transparent Kafka topic addition/removal without Flink job restart, and direct integration
34+
with Hybrid Source.
35+
36+
## Dependency
37+
38+
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
39+
40+
{{< connector_artifact flink-connector-kafka 3.1.0 >}}
41+
42+
Flink's streaming connectors are not part of the binary distribution.
43+
See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
44+
45+
## Dynamic Kafka Source
46+
{{< hint info >}}
47+
This part describes the Dynamic Kafka Source based on the new
48+
[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
49+
{{< /hint >}}
50+
51+
## Usage
52+
53+
Dynamic Kafka Source provides a builder class to initialize the DynamicKafkaSource. The code snippet
54+
below shows how to build a DynamicKafkaSource to consume messages from the earliest offset of the
55+
stream "input-stream" and deserialize only the value of the
56+
ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the cluster(s) and topic(s)
57+
corresponding to "input-stream".
58+
59+
{{< tabs "KafkaSource" >}}
60+
{{< tab "Java" >}}
61+
```java
62+
63+
DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder()
64+
.setKafkaMetadataService(new MyKafkaMetadataService())
65+
.setStreamIds(Collections.singleton("input-stream"))
66+
.setStartingOffsets(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
67+
.setDeserializer(new SimpleStringSchema())
68+
.setProperties(properties)
69+
.build();
70+
71+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
72+
```
73+
{{< /tab >}}
74+
{{< /tabs >}}
75+
76+
### Kafka Metadata Service
77+
78+
An interface is provided to resolve the logical Kafka stream(s) into the corresponding physical
79+
topic(s) and cluster(s). Typically, these implementations are based on services that align well
80+
with internal Kafka infrastructure--if that is not available, an in-memory implementation
81+
would also work. An example of in-memory implementation can be found in our tests.
82+
83+
This source achieves its dynamic characteristic by periodically polling this Kafka metadata service
84+
for any changes to the Kafka stream(s) and reconciling the reader tasks to subscribe to the new
85+
Kafka metadata returned by the service. For example, in the case of a Kafka migration, the source would
86+
swap from one cluster to the new cluster when the service makes that change in the Kafka stream metadata.
87+
88+
### Additional Details
89+
90+
For additional details on deserialization, event time and watermarks, idleness, consumer offset
91+
committing, security, and more, you can refer to the Kafka Source documentation. This is possible because the
92+
Dynamic Kafka Source leverages components of the Kafka Source, and the implementation will be
93+
discussed in the next section.
94+
95+
### Behind the Scene
96+
{{< hint info >}}
97+
If you are interested in how Kafka source works under the design of new data source API, you may
98+
want to read this part as a reference. For details about the new data source API,
99+
[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and
100+
<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
101+
provide more descriptive discussions.
102+
{{< /hint >}}
103+
104+
105+
Under the abstraction of the new data source API, Dynamic Kafka Source consists of the following components:
106+
#### Source Split
107+
A source split in Dynamic Kafka Source represents a partition of a Kafka topic, with cluster information. It
108+
consists of:
109+
* A Kafka cluster id that can be resolved by the Kafka metadata service.
110+
* A Kafka Source Split (TopicPartition, starting offset, stopping offset).
111+
112+
You can check the class `DynamicKafkaSourceSplit` for more details.
113+
114+
#### Split Enumerator
115+
116+
This enumerator is responsible for discovering and assigning splits from 1+ cluster. At startup, the
117+
enumerator will discover metadata belonging to the Kafka stream ids. Using the metadata, it can
118+
initialize KafkaSourceEnumerators to handle the functions of assigning splits to the readers. In addition,
119+
source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the
120+
KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators when metadata changes involve
121+
clearing outdated metrics since clusters may be removed and so should their metrics.
122+
123+
#### Source Reader
124+
125+
This reader is responsible for reading from 1+ clusters and using the KafkaSourceReader to fetch
126+
records from topics and clusters based on the metadata. When new metadata is discovered by the enumerator,
127+
the reader will reconcile metadata changes to possibly restart the KafkaSourceReader to read from the new
128+
set of topics and clusters.
129+
130+
#### Kafka Metadata Service
131+
132+
This interface represents the source of truth for the current metadata for the configured Kafka stream ids.
133+
Metadata that is removed in between polls is considered non-active (e.g. removing a cluster from the
134+
return value, means that a cluster is non-active and should not be read from). The cluster metadata
135+
contains an immutable Kafka cluster id, the set of topics, and properties needed to connect to the
136+
Kafka cluster.
137+
138+
#### FLIP 246
139+
140+
To understand more behind the scenes, please read [FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320)
141+
for more details and discussion.

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ under the License.
171171
<dependency>
172172
<groupId>com.google.guava</groupId>
173173
<artifactId>guava</artifactId>
174-
<version>32.1.2-jre</version>
174+
<version>${guava.version}</version>
175175
</dependency>
176176
</dependencies>
177177

@@ -250,7 +250,7 @@ under the License.
250250
<artifactItem>
251251
<groupId>com.google.guava</groupId>
252252
<artifactId>guava</artifactId>
253-
<version>32.1.2-jre</version>
253+
<version>${guava.version}</version>
254254
<destFileName>guava.jar</destFileName>
255255
<type>jar</type>
256256
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>

0 commit comments

Comments
 (0)