|
| 1 | +--- |
| 2 | +title: Dynamic Kafka |
| 3 | +weight: 3 |
| 4 | +type: docs |
| 5 | +aliases: |
| 6 | +- /dev/connectors/dynamic-kafka.html |
| 7 | +--- |
| 8 | +<!-- |
| 9 | +Licensed to the Apache Software Foundation (ASF) under one |
| 10 | +or more contributor license agreements. See the NOTICE file |
| 11 | +distributed with this work for additional information |
| 12 | +regarding copyright ownership. The ASF licenses this file |
| 13 | +to you under the Apache License, Version 2.0 (the |
| 14 | +"License"); you may not use this file except in compliance |
| 15 | +with the License. You may obtain a copy of the License at |
| 16 | +
|
| 17 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 18 | +
|
| 19 | +Unless required by applicable law or agreed to in writing, |
| 20 | +software distributed under the License is distributed on an |
| 21 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 22 | +KIND, either express or implied. See the License for the |
| 23 | +specific language governing permissions and limitations |
| 24 | +under the License. |
| 25 | +--> |
| 26 | + |
| 27 | +# Dynamic Kafka Source _`Experimental`_ |
| 28 | + |
| 29 | +Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading data from Kafka topics from one or more Kafka clusters. |
| 30 | +The Dynamic Kafka connector discovers the clusters and topics using a Kafka metadata service and can achieve reading in a dynamic fashion, facilitating changes in |
| 31 | +topics and/or clusters, without requiring a job restart. This is especially useful when you need to read a new Kafka cluster/topic and/or stop reading |
| 32 | +an existing Kafka cluster/topic (cluster migration/failover/other infrastructure changes) and when you need direct integration with Hybrid Source. The solution |
| 33 | +makes these operations automated so that they are transparent to Kafka consumers. |
| 34 | + |
| 35 | +## Dependency |
| 36 | + |
| 37 | +For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). |
| 38 | + |
| 39 | +{{< connector_artifact flink-connector-kafka 3.1.0 >}} |
| 40 | + |
| 41 | +Flink's streaming connectors are not part of the binary distribution. |
| 42 | +See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). |
| 43 | + |
| 44 | +## Dynamic Kafka Source |
| 45 | +{{< hint info >}} |
| 46 | +This part describes the Dynamic Kafka Source based on the new |
| 47 | +[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API. |
| 48 | +{{< /hint >}} |
| 49 | + |
| 50 | +### Usage |
| 51 | + |
| 52 | +Dynamic Kafka Source provides a builder class to initialize the DynamicKafkaSource. The code snippet |
| 53 | +below shows how to build a DynamicKafkaSource to consume messages from the earliest offset of the |
| 54 | +stream "input-stream" and deserialize only the value of the |
| 55 | +ConsumerRecord as a string, using "MyKafkaMetadataService" to resolve the cluster(s) and topic(s) |
| 56 | +corresponding to "input-stream". |
| 57 | + |
| 58 | +{{< tabs "DynamicKafkaSource" >}} |
| 59 | +{{< tab "Java" >}} |
| 60 | +```java |
| 61 | + |
| 62 | +DynamicKafkaSource<String> source = DynamicKafkaSource.<String>builder() |
| 63 | + .setKafkaMetadataService(new MyKafkaMetadataService()) |
| 64 | + .setStreamIds(Collections.singleton("input-stream")) |
| 65 | + .setStartingOffsets(OffsetsInitializer.earliest()) |
| 66 | + .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) |
| 67 | + .setProperties(properties) |
| 68 | + .build(); |
| 69 | + |
| 70 | + env.fromSource(source, WatermarkStrategy.noWatermarks(), "Dynamic Kafka Source"); |
| 71 | +``` |
| 72 | +{{< /tab >}} |
| 73 | +{{< /tabs >}} |
| 74 | +The following properties are **required** for building a DynamicKafkaSource: |
| 75 | + |
| 76 | +The Kafka metadata service, configured by setKafkaMetadataService(KafkaMetadataService) |
| 77 | +The stream ids to subscribe, see the following Kafka stream subscription section for more details. |
| 78 | +Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{< ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details. |
| 79 | + |
| 80 | +### Kafka Stream Subscription |
| 81 | +The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s). |
| 82 | +* A set of Kafka stream ids. For example: |
| 83 | + {{< tabs "DynamicKafkaSource#setStreamIds" >}} |
| 84 | + {{< tab "Java" >}} |
| 85 | + ```java |
| 86 | + DynamicKafkaSource.builder().setStreamIds(Set.of("stream-a", "stream-b")); |
| 87 | + ``` |
| 88 | + {{< /tab >}} |
| 89 | + {{< /tabs >}} |
| 90 | +* A regex pattern that subscribes to all Kafka stream ids that match the provided regex. For example: |
| 91 | + {{< tabs "DynamicKafkaSource#setStreamPattern" >}} |
| 92 | + {{< tab "Java" >}} |
| 93 | + ```java |
| 94 | + DynamicKafkaSource.builder().setStreamPattern(Pattern.of("stream.*")); |
| 95 | + ``` |
| 96 | + {{< /tab >}} |
| 97 | + {{< /tabs >}} |
| 98 | + |
| 99 | +### Kafka Metadata Service |
| 100 | + |
| 101 | +An interface is provided to resolve the logical Kafka stream(s) into the corresponding physical |
| 102 | +topic(s) and cluster(s). Typically, these implementations are based on services that align well |
| 103 | +with internal Kafka infrastructure--if that is not available, an in-memory implementation |
| 104 | +would also work. An example of in-memory implementation can be found in our tests. |
| 105 | + |
| 106 | +This source achieves its dynamic characteristic by periodically polling this Kafka metadata service |
| 107 | +for any changes to the Kafka stream(s) and reconciling the reader tasks to subscribe to the new |
| 108 | +Kafka metadata returned by the service. For example, in the case of a Kafka migration, the source would |
| 109 | +swap from one cluster to the new cluster when the service makes that change in the Kafka stream metadata. |
| 110 | + |
| 111 | +### Additional Properties |
| 112 | +There are configuration options in DynamicKafkaSourceOptions that can be configured in the properties through the builder: |
| 113 | +<table class="table table-bordered"> |
| 114 | + <thead> |
| 115 | + <tr> |
| 116 | + <th class="text-left" style="width: 25%">Option</th> |
| 117 | + <th class="text-center" style="width: 8%">Required</th> |
| 118 | + <th class="text-center" style="width: 7%">Default</th> |
| 119 | + <th class="text-center" style="width: 10%">Type</th> |
| 120 | + <th class="text-center" style="width: 50%">Description</th> |
| 121 | + </tr> |
| 122 | + </thead> |
| 123 | + <tbody> |
| 124 | + <tr> |
| 125 | + <td><h5>stream-metadata-discovery-interval-ms</h5></td> |
| 126 | + <td>required</td> |
| 127 | + <td style="word-wrap: break-word;">-1</td> |
| 128 | + <td>Long</td> |
| 129 | + <td>The interval in milliseconds for the source to discover the changes in stream metadata. A non-positive value disables the stream metadata discovery.</td> |
| 130 | + </tr> |
| 131 | + <tr> |
| 132 | + <td><h5>stream-metadata-discovery-failure-threshold</h5></td> |
| 133 | + <td>required</td> |
| 134 | + <td style="word-wrap: break-word;">1</td> |
| 135 | + <td>Integer</td> |
| 136 | + <td>The number of consecutive failures before letting the exception from Kafka metadata service discovery trigger jobmanager failure and global failover. The default is one to at least catch startup failures.</td> |
| 137 | + </tr> |
| 138 | + </tbody> |
| 139 | +</table> |
| 140 | + |
| 141 | + |
| 142 | +In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#additional-properties) for |
| 143 | +a list of applicable properties. |
| 144 | + |
| 145 | +### Metrics |
| 146 | + |
| 147 | +<table class="table table-bordered"> |
| 148 | + <thead> |
| 149 | + <tr> |
| 150 | + <th class="text-left" style="width: 15%">Scope</th> |
| 151 | + <th class="text-left" style="width: 18%">Metrics</th> |
| 152 | + <th class="text-left" style="width: 18%">User Variables</th> |
| 153 | + <th class="text-left" style="width: 39%">Description</th> |
| 154 | + <th class="text-left" style="width: 10%">Type</th> |
| 155 | + </tr> |
| 156 | + </thead> |
| 157 | + <tbody> |
| 158 | + <tr> |
| 159 | + <th rowspan="8">Operator</th> |
| 160 | + <td>currentEmitEventTimeLag</td> |
| 161 | + <td>n/a</td> |
| 162 | + <td>The time span from the record event timestamp to the time the record is emitted by the source connector¹: <code>currentEmitEventTimeLag = EmitTime - EventTime.</code></td> |
| 163 | + <td>Gauge</td> |
| 164 | + </tr> |
| 165 | + <tr> |
| 166 | + <td>watermarkLag</td> |
| 167 | + <td>n/a</td> |
| 168 | + <td>The time span that the watermark lags behind the wall clock time: <code>watermarkLag = CurrentTime - Watermark</code></td> |
| 169 | + <td>Gauge</td> |
| 170 | + </tr> |
| 171 | + <tr> |
| 172 | + <td>sourceIdleTime</td> |
| 173 | + <td>n/a</td> |
| 174 | + <td>The time span that the source has not processed any record: <code>sourceIdleTime = CurrentTime - LastRecordProcessTime</code></td> |
| 175 | + <td>Gauge</td> |
| 176 | + </tr> |
| 177 | + <tr> |
| 178 | + <td>pendingRecords</td> |
| 179 | + <td>n/a</td> |
| 180 | + <td>The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition.</td> |
| 181 | + <td>Gauge</td> |
| 182 | + </tr> |
| 183 | + <tr> |
| 184 | + <td>kafkaClustersCount</td> |
| 185 | + <td>n/a</td> |
| 186 | + <td>The total number of Kafka clusters read by this reader.</td> |
| 187 | + <td>Gauge</td> |
| 188 | + </tr> |
| 189 | + </tbody> |
| 190 | +</table> |
| 191 | + |
| 192 | +In addition to this list, see the [regular Kafka connector]({{< ref "docs/connectors/datastream/kafka" >}}#monitoring) for |
| 193 | +the KafkaSourceReader metrics that are also reported. |
| 194 | + |
| 195 | +### Additional Details |
| 196 | + |
| 197 | +For additional details on deserialization, event time and watermarks, idleness, consumer offset |
| 198 | +committing, security, and more, you can refer to the [Kafka Source documentation]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-source). This is possible because the |
| 199 | +Dynamic Kafka Source leverages components of the Kafka Source, and the implementation will be |
| 200 | +discussed in the next section. |
| 201 | + |
| 202 | +### Behind the Scene |
| 203 | +{{< hint info >}} |
| 204 | +If you are interested in how Kafka source works under the design of new data source API, you may |
| 205 | +want to read this part as a reference. For details about the new data source API, |
| 206 | +[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and |
| 207 | +<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a> |
| 208 | +provide more descriptive discussions. |
| 209 | +{{< /hint >}} |
| 210 | + |
| 211 | + |
| 212 | +Under the abstraction of the new data source API, Dynamic Kafka Source consists of the following components: |
| 213 | +#### Source Split |
| 214 | +A source split in Dynamic Kafka Source represents a partition of a Kafka topic, with cluster information. It |
| 215 | +consists of: |
| 216 | +* A Kafka cluster id that can be resolved by the Kafka metadata service. |
| 217 | +* A Kafka Source Split (TopicPartition, starting offset, stopping offset). |
| 218 | + |
| 219 | +You can check the class `DynamicKafkaSourceSplit` for more details. |
| 220 | + |
| 221 | +#### Split Enumerator |
| 222 | + |
| 223 | +This enumerator is responsible for discovering and assigning splits from one or more clusters. At startup, the |
| 224 | +enumerator will discover metadata belonging to the Kafka stream ids. Using the metadata, it can |
| 225 | +initialize KafkaSourceEnumerators to handle the functions of assigning splits to the readers. In addition, |
| 226 | +source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the |
| 227 | +KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators when metadata changes involve |
| 228 | +clearing outdated metrics since clusters may be removed and so should their metrics. |
| 229 | + |
| 230 | +#### Source Reader |
| 231 | + |
| 232 | +This reader is responsible for reading from one or more clusters and using the KafkaSourceReader to fetch |
| 233 | +records from topics and clusters based on the metadata. When new metadata is discovered by the enumerator, |
| 234 | +the reader will reconcile metadata changes to possibly restart the KafkaSourceReader to read from the new |
| 235 | +set of topics and clusters. |
| 236 | + |
| 237 | +#### Kafka Metadata Service |
| 238 | + |
| 239 | +This interface represents the source of truth for the current metadata for the configured Kafka stream ids. |
| 240 | +Metadata that is removed in between polls is considered non-active (e.g. removing a cluster from the |
| 241 | +return value, means that a cluster is non-active and should not be read from). The cluster metadata |
| 242 | +contains an immutable Kafka cluster id, the set of topics, and properties needed to connect to the |
| 243 | +Kafka cluster. |
| 244 | + |
| 245 | +#### FLIP 246 |
| 246 | + |
| 247 | +To understand more behind the scenes, please read [FLIP-246](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320) |
| 248 | +for more details and discussion. |
0 commit comments