|
15 | 15 | # See the License for the specific language governing permissions and
|
16 | 16 | # limitations under the License.
|
17 | 17 | ################################################################################
|
18 |
| -import warnings |
19 | 18 | from abc import ABC, abstractmethod
|
20 | 19 | from enum import Enum
|
21 |
| -from typing import Dict, Union, List, Set, Callable, Any, Optional |
| 20 | +from typing import Dict, Union, Set, Callable, Any, Optional |
22 | 21 |
|
23 | 22 | from py4j.java_gateway import JavaObject, get_java_class
|
24 |
| - |
25 |
| -from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema, \ |
| 23 | +from pyflink.common import DeserializationSchema, SerializationSchema, \ |
26 | 24 | Types, Row
|
27 | 25 | from pyflink.datastream.connectors import Source, Sink
|
28 | 26 | from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \
|
29 | 27 | StreamTransformer
|
30 |
| -from pyflink.datastream.functions import SinkFunction, SourceFunction |
31 | 28 | from pyflink.java_gateway import get_gateway
|
32 | 29 | from pyflink.util.java_utils import to_jarray, get_field, get_field_value
|
33 | 30 |
|
34 | 31 | __all__ = [
|
35 |
| - 'FlinkKafkaConsumer', |
36 |
| - 'FlinkKafkaProducer', |
37 | 32 | 'KafkaSource',
|
38 | 33 | 'KafkaSourceBuilder',
|
39 | 34 | 'KafkaSink',
|
40 | 35 | 'KafkaSinkBuilder',
|
41 |
| - 'Semantic', |
42 | 36 | 'KafkaTopicPartition',
|
43 | 37 | 'KafkaOffsetsInitializer',
|
44 | 38 | 'KafkaOffsetResetStrategy',
|
|
48 | 42 | ]
|
49 | 43 |
|
50 | 44 |
|
51 |
| -# ---- FlinkKafkaConsumer ---- |
52 |
| - |
53 |
| -class FlinkKafkaConsumerBase(SourceFunction, ABC): |
54 |
| - """ |
55 |
| - Base class of all Flink Kafka Consumer data sources. This implements the common behavior across |
56 |
| - all kafka versions. |
57 |
| -
|
58 |
| - The Kafka version specific behavior is defined mainly in the specific subclasses. |
59 |
| - """ |
60 |
| - |
61 |
| - def __init__(self, j_flink_kafka_consumer): |
62 |
| - super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer) |
63 |
| - |
64 |
| - def set_commit_offsets_on_checkpoints(self, |
65 |
| - commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase': |
66 |
| - """ |
67 |
| - Specifies whether or not the consumer should commit offsets back to kafka on checkpoints. |
68 |
| - This setting will only have effect if checkpointing is enabled for the job. If checkpointing |
69 |
| - isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+) |
70 |
| - property settings will be used. |
71 |
| - """ |
72 |
| - self._j_function = self._j_function \ |
73 |
| - .setCommitOffsetsOnCheckpoints(commit_on_checkpoints) |
74 |
| - return self |
75 |
| - |
76 |
| - def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase': |
77 |
| - """ |
78 |
| - Specifies the consumer to start reading from the earliest offset for all partitions. This |
79 |
| - lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers. |
80 |
| -
|
81 |
| - This method does not affect where partitions are read from when the consumer is restored |
82 |
| - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
83 |
| - savepoint, only the offsets in the restored state will be used. |
84 |
| - """ |
85 |
| - self._j_function = self._j_function.setStartFromEarliest() |
86 |
| - return self |
87 |
| - |
88 |
| - def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase': |
89 |
| - """ |
90 |
| - Specifies the consuer to start reading from the latest offset for all partitions. This lets |
91 |
| - the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. |
92 |
| -
|
93 |
| - This method does not affect where partitions are read from when the consumer is restored |
94 |
| - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
95 |
| - savepoint, only the offsets in the restored state will be used. |
96 |
| - """ |
97 |
| - self._j_function = self._j_function.setStartFromLatest() |
98 |
| - return self |
99 |
| - |
100 |
| - def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase': |
101 |
| - """ |
102 |
| - Specifies the consumer to start reading partitions from a specified timestamp. The specified |
103 |
| - timestamp must be before the current timestamp. This lets the consumer ignore any committed |
104 |
| - group offsets in Zookeeper / Kafka brokers. |
105 |
| -
|
106 |
| - The consumer will look up the earliest offset whose timestamp is greater than or equal to |
107 |
| - the specific timestamp from Kafka. If there's no such offset, the consumer will use the |
108 |
| - latest offset to read data from Kafka. |
109 |
| -
|
110 |
| - This method does not affect where partitions are read from when the consumer is restored |
111 |
| - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
112 |
| - savepoint, only the offsets in the restored state will be used. |
113 |
| -
|
114 |
| - :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for |
115 |
| - epoch. |
116 |
| - """ |
117 |
| - self._j_function = self._j_function.setStartFromTimestamp( |
118 |
| - startup_offsets_timestamp) |
119 |
| - return self |
120 |
| - |
121 |
| - def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase': |
122 |
| - """ |
123 |
| - Specifies the consumer to start reading from any committed group offsets found in Zookeeper/ |
124 |
| - Kafka brokers. The 'group.id' property must be set in the configuration properties. If no |
125 |
| - offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the |
126 |
| - configuration properties will be used for the partition. |
127 |
| -
|
128 |
| - This method does not affect where partitions are read from when the consumer is restored |
129 |
| - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
130 |
| - savepoint, only the offsets in the restored state will be used. |
131 |
| - """ |
132 |
| - self._j_function = self._j_function.setStartFromGroupOffsets() |
133 |
| - return self |
134 |
| - |
135 |
| - def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase': |
136 |
| - """ |
137 |
| - By default, when restoring from a checkpoint / savepoint, the consumer always ignores |
138 |
| - restored partitions that are no longer associated with the current specified topics or topic |
139 |
| - pattern to subscribe to. |
140 |
| -
|
141 |
| - This method does not affect where partitions are read from when the consumer is restored |
142 |
| - from a checkpoint or savepoint. When the consumer is restored from a checkpoint or |
143 |
| - savepoint, only the offsets in the restored state will be used. |
144 |
| - """ |
145 |
| - self._j_function = self._j_function \ |
146 |
| - .disableFilterRestoredPartitionsWithSubscribedTopics() |
147 |
| - return self |
148 |
| - |
149 |
| - def get_produced_type(self) -> TypeInformation: |
150 |
| - return typeinfo._from_java_type(self._j_function.getProducedType()) |
151 |
| - |
152 |
| - |
153 |
| -def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): |
154 |
| - if not isinstance(topics, list): |
155 |
| - topics = [topics] |
156 |
| - gateway = get_gateway() |
157 |
| - j_properties = gateway.jvm.java.util.Properties() |
158 |
| - for key, value in properties.items(): |
159 |
| - j_properties.setProperty(key, value) |
160 |
| - |
161 |
| - j_flink_kafka_consumer = j_consumer_clz(topics, |
162 |
| - deserialization_schema._j_deserialization_schema, |
163 |
| - j_properties) |
164 |
| - return j_flink_kafka_consumer |
165 |
| - |
166 |
| - |
167 |
| -class FlinkKafkaConsumer(FlinkKafkaConsumerBase): |
168 |
| - """ |
169 |
| - The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from |
170 |
| - Apache Kafka. The consumer can run in multiple parallel instances, each of which will |
171 |
| - pull data from one or more Kafka partitions. |
172 |
| -
|
173 |
| - The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost |
174 |
| - during a failure, and that the computation processes elements 'exactly once. (These guarantees |
175 |
| - naturally assume that Kafka itself does not lose any data.) |
176 |
| -
|
177 |
| - Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. |
178 |
| - The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in |
179 |
| - sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of |
180 |
| - how far the Flink Kafka consumer has consumed a topic. |
181 |
| -
|
182 |
| - Please refer to Kafka's documentation for the available configuration properties: |
183 |
| - http://kafka.apache.org/documentation.html#newconsumerconfigs |
184 |
| - """ |
185 |
| - |
186 |
| - def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, |
187 |
| - properties: Dict): |
188 |
| - """ |
189 |
| - Creates a new Kafka streaming source consumer for Kafka 0.10.x. |
190 |
| -
|
191 |
| - This constructor allows passing multiple topics to the consumer. |
192 |
| -
|
193 |
| - :param topics: The Kafka topics to read from. |
194 |
| - :param deserialization_schema: The de-/serializer used to convert between Kafka's byte |
195 |
| - messages and Flink's objects. |
196 |
| - :param properties: The properties that are used to configure both the fetcher and the offset |
197 |
| - handler. |
198 |
| - """ |
199 |
| - |
200 |
| - warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", DeprecationWarning) |
201 |
| - JFlinkKafkaConsumer = get_gateway().jvm \ |
202 |
| - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer |
203 |
| - j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, |
204 |
| - JFlinkKafkaConsumer) |
205 |
| - super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer) |
206 |
| - |
207 |
| - |
208 |
| -# ---- FlinkKafkaProducer ---- |
209 |
| - |
210 |
| - |
211 |
| -class Semantic(Enum): |
212 |
| - """ |
213 |
| - Semantics that can be chosen. |
214 |
| -
|
215 |
| - :data: `EXACTLY_ONCE`: |
216 |
| -
|
217 |
| - The Flink producer will write all messages in a Kafka transaction that will be committed to |
218 |
| - the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of |
219 |
| - FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is |
220 |
| - being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint |
221 |
| - complete notifications are running late, FlinkKafkaProducer can run out of |
222 |
| - FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- |
223 |
| - State() requests will fail and the FlinkKafkaProducer will keep using the |
224 |
| - FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints |
225 |
| - there are four options: |
226 |
| -
|
227 |
| - 1. decrease number of max concurrent checkpoints |
228 |
| - 2. make checkpoints mre reliable (so that they complete faster) |
229 |
| - 3. increase delay between checkpoints |
230 |
| - 4. increase size of FlinkKafkaProducers pool |
231 |
| -
|
232 |
| - :data: `AT_LEAST_ONCE`: |
233 |
| -
|
234 |
| - The Flink producer will wait for all outstanding messages in the Kafka buffers to be |
235 |
| - acknowledged by the Kafka producer on a checkpoint. |
236 |
| -
|
237 |
| - :data: `NONE`: |
238 |
| -
|
239 |
| - Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of |
240 |
| - failure. |
241 |
| -
|
242 |
| - """ |
243 |
| - |
244 |
| - EXACTLY_ONCE = 0, |
245 |
| - AT_LEAST_ONCE = 1, |
246 |
| - NONE = 2 |
247 |
| - |
248 |
| - def _to_j_semantic(self): |
249 |
| - JSemantic = get_gateway().jvm \ |
250 |
| - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic |
251 |
| - return getattr(JSemantic, self.name) |
252 |
| - |
253 |
| - |
254 |
| -class FlinkKafkaProducerBase(SinkFunction, ABC): |
255 |
| - """ |
256 |
| - Flink Sink to produce data into a Kafka topic. |
257 |
| -
|
258 |
| - Please note that this producer provides at-least-once reliability guarantees when checkpoints |
259 |
| - are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any |
260 |
| - reliability guarantees. |
261 |
| - """ |
262 |
| - |
263 |
| - def __init__(self, j_flink_kafka_producer): |
264 |
| - super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) |
265 |
| - |
266 |
| - def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': |
267 |
| - """ |
268 |
| - Defines whether the producer should fail on errors, or only log them. If this is set to |
269 |
| - true, then exceptions will be only logged, if set to false, exceptions will be eventually |
270 |
| - thrown and cause the streaming program to fail (and enter recovery). |
271 |
| -
|
272 |
| - :param log_failures_only: The flag to indicate logging-only on exceptions. |
273 |
| - """ |
274 |
| - self._j_function.setLogFailuresOnly(log_failures_only) |
275 |
| - return self |
276 |
| - |
277 |
| - def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': |
278 |
| - """ |
279 |
| - If set to true, the Flink producer will wait for all outstanding messages in the Kafka |
280 |
| - buffers to be acknowledged by the Kafka producer on a checkpoint. |
281 |
| -
|
282 |
| - This way, the producer can guarantee that messages in the Kafka buffers are part of the |
283 |
| - checkpoint. |
284 |
| -
|
285 |
| - :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) |
286 |
| - """ |
287 |
| - self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) |
288 |
| - return self |
289 |
| - |
290 |
| - def set_write_timestamp_to_kafka(self, |
291 |
| - write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': |
292 |
| - """ |
293 |
| - If set to true, Flink will write the (event time) timestamp attached to each record into |
294 |
| - Kafka. Timestamps must be positive for Kafka to accept them. |
295 |
| -
|
296 |
| - :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written |
297 |
| - to Kafka. |
298 |
| - """ |
299 |
| - self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) |
300 |
| - return self |
301 |
| - |
302 |
| - |
303 |
| -class FlinkKafkaProducer(FlinkKafkaProducerBase): |
304 |
| - """ |
305 |
| - Flink Sink to produce data into a Kafka topic. By |
306 |
| - default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to |
307 |
| - Flink's Kafka connector documentation. |
308 |
| - """ |
309 |
| - |
310 |
| - def __init__(self, topic: str, serialization_schema: SerializationSchema, |
311 |
| - producer_config: Dict, kafka_producer_pool_size: int = 5, |
312 |
| - semantic=Semantic.AT_LEAST_ONCE): |
313 |
| - """ |
314 |
| - Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. |
315 |
| -
|
316 |
| - Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. |
317 |
| - This default partitioner maps each sink subtask to a single Kafka partition (i.e. all |
318 |
| - records received by a sink subtask will end up in the same Kafka partition). |
319 |
| -
|
320 |
| - :param topic: ID of the Kafka topic. |
321 |
| - :param serialization_schema: User defined key-less serialization schema. |
322 |
| - :param producer_config: Properties with the producer configuration. |
323 |
| - """ |
324 |
| - gateway = get_gateway() |
325 |
| - j_properties = gateway.jvm.java.util.Properties() |
326 |
| - for key, value in producer_config.items(): |
327 |
| - j_properties.setProperty(key, value) |
328 |
| - |
329 |
| - JFlinkKafkaProducer = gateway.jvm \ |
330 |
| - .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer |
331 |
| - |
332 |
| - j_flink_kafka_producer = JFlinkKafkaProducer( |
333 |
| - topic, serialization_schema._j_serialization_schema, j_properties, None, |
334 |
| - semantic._to_j_semantic(), kafka_producer_pool_size) |
335 |
| - super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) |
336 |
| - |
337 |
| - def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': |
338 |
| - """ |
339 |
| - Disables the propagation of exceptions thrown when committing presumably timed out Kafka |
340 |
| - transactions during recovery of the job. If a Kafka transaction is timed out, a commit will |
341 |
| - never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions |
342 |
| - will still be logged to inform the user that data loss might have occurred. |
343 |
| -
|
344 |
| - Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, |
345 |
| - only exceptions thrown during the recovery are caught, i.e., the producer will attempt at |
346 |
| - least one commit of the transaction before giving up. |
347 |
| -
|
348 |
| - :return: This FlinkKafkaProducer. |
349 |
| - """ |
350 |
| - self._j_function.ignoreFailuresAfterTransactionTimeout() |
351 |
| - return self |
352 |
| - |
353 |
| - |
354 | 45 | # ---- KafkaSource ----
|
355 | 46 |
|
356 | 47 |
|
|
0 commit comments