Skip to content

Commit 3736c9b

Browse files
authored
Add dynamic read support to KafkaIO in Python SDK (#35292)
* Add dynamic read support to KafkaIO in Python SDK * removed the comment
1 parent cdf5061 commit 3736c9b

File tree

2 files changed

+24
-8
lines changed
  • sdks

2 files changed

+24
-8
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -875,9 +875,13 @@ static <K, V> void setupExternalBuilder(
875875
builder.setStopReadTime(Instant.ofEpochMilli(config.stopReadTime));
876876
}
877877

878-
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
879-
// implementation.
880-
builder.setDynamicRead(false);
878+
if (config.dynamicReadPollIntervalSeconds != null) {
879+
builder.setDynamicRead(true);
880+
builder.setWatchTopicPartitionDuration(
881+
Duration.standardSeconds(config.dynamicReadPollIntervalSeconds));
882+
} else {
883+
builder.setDynamicRead(false);
884+
}
881885

882886
if (config.consumerPollingTimeout != null) {
883887
if (config.consumerPollingTimeout <= 0) {
@@ -975,6 +979,7 @@ public static class Configuration {
975979
private Boolean redistribute;
976980
private Boolean allowDuplicates;
977981
private Boolean offsetDeduplication;
982+
private Long dynamicReadPollIntervalSeconds;
978983

979984
public void setConsumerConfig(Map<String, String> consumerConfig) {
980985
this.consumerConfig = consumerConfig;
@@ -1035,6 +1040,10 @@ public void setAllowDuplicates(Boolean allowDuplicates) {
10351040
public void setOffsetDeduplication(Boolean offsetDeduplication) {
10361041
this.offsetDeduplication = offsetDeduplication;
10371042
}
1043+
1044+
public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) {
1045+
this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
1046+
}
10381047
}
10391048
}
10401049

sdks/python/apache_beam/io/kafka.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
('redistribute', typing.Optional[bool]),
125125
('redistribute_num_keys', typing.Optional[np.int32]),
126126
('allow_duplicates', typing.Optional[bool]),
127+
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
127128
])
128129

129130

@@ -171,6 +172,7 @@ def __init__(
171172
redistribute=False,
172173
redistribute_num_keys=np.int32(0),
173174
allow_duplicates=False,
175+
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
174176
):
175177
"""
176178
Initializes a read operation from Kafka.
@@ -195,7 +197,7 @@ def __init__(
195197
:param timestamp_policy: The built-in timestamp policy which is used for
196198
extracting timestamp from KafkaRecord.
197199
:param consumer_polling_timeout: Kafka client polling request
198-
timeout time in seconds. A lower timeout optimizes for latency. Increase
200+
timeout time in seconds. A lower timeout optimizes for latency. Increase
199201
the timeout if the consumer is not fetching any records. Default is 2
200202
seconds.
201203
:param with_metadata: whether the returned PCollection should contain
@@ -205,12 +207,15 @@ def __init__(
205207
this only works when using default key and value deserializers where
206208
Java Kafka Reader reads keys and values as 'byte[]'.
207209
:param expansion_service: The address (host:port) of the ExpansionService.
208-
:param redistribute: whether a Redistribute transform should be applied
210+
:param redistribute: whether a Redistribute transform should be applied
209211
immediately after the read.
210-
:param redistribute_num_keys: Configures how many keys the Redistribute
212+
:param redistribute_num_keys: Configures how many keys the Redistribute
211213
spreads the data across.
212-
:param allow_duplicates: whether the Redistribute transform allows for
214+
:param allow_duplicates: whether the Redistribute transform allows for
213215
duplicates (this serves solely as a hint to the underlying runner).
216+
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
217+
to check for new partitions. If not None, dynamic partition discovery
218+
is enabled.
214219
"""
215220
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
216221
ReadFromKafka.create_time_policy,
@@ -235,7 +240,9 @@ def __init__(
235240
consumer_polling_timeout=consumer_polling_timeout,
236241
redistribute=redistribute,
237242
redistribute_num_keys=redistribute_num_keys,
238-
allow_duplicates=allow_duplicates)),
243+
allow_duplicates=allow_duplicates,
244+
dynamic_read_poll_interval_seconds=
245+
dynamic_read_poll_interval_seconds)),
239246
expansion_service or default_io_expansion_service())
240247

241248

0 commit comments

Comments
 (0)