From 3c73487abec28a9e1bb546c3c0ef3587b9c1420c Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 12:34:01 +0000 Subject: [PATCH 1/9] Make ReadFromKafkaDoFn restriction trackers unsplittable --- .../GrowableOffsetRangeTracker.java | 2 +- .../UnsplittableRestrictionTracker.java | 69 +++++++++++++++++++ .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 16 +++-- 3 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java index 97b0d9b8e787..75c25118c39a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java @@ -30,7 +30,7 @@ * used as the end of the range to indicate infinity. * *

An offset range is considered growable when the end offset could grow (or change) during - * execution time (e.g., Kafka topic partition offset, appended file, ...). + * execution time (e.g., appended file, ...). * *

The growable range is marked as done by claiming {@code Long.MAX_VALUE}. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java new file mode 100644 index 000000000000..f6c0e95371e1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +/** + * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable + * restrictions. + * + *

A restriction is considered unsplittable when restrictions of an element must not be processed + * simultaneously (e.g., Kafka topic partition). + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class UnsplittableRestrictionTracker + extends RestrictionTracker implements RestrictionTracker.HasProgress { + private final RestrictionTracker tracker; + + public UnsplittableRestrictionTracker(RestrictionTracker tracker) { + this.tracker = tracker; + } + + @Override + public boolean tryClaim(PositionT position) { + return tracker.tryClaim(position); + } + + @Override + public RestrictionT currentRestriction() { + return tracker.currentRestriction(); + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { + return fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); + } + + @Override + public void checkDone() throws IllegalStateException { + tracker.checkDone(); + } + + @Override + public IsBounded isBounded() { + return tracker.isBounded(); + } + + @Override + public Progress getProgress() { + return tracker instanceof RestrictionTracker.HasProgress + ? ((RestrictionTracker.HasProgress) tracker).getProgress() + : Progress.NONE; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 60fc9d57a626..461a40164884 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -108,6 +109,12 @@ * *

Splitting

* + *

Consumer groups must not consume from the same {@link TopicPartition} simultaneously. Doing so + * may arbitrarily overwrite a consumer group's committed offset for a {@link TopicPartition}. + * Restriction trackers for a {@link KafkaSourceDescriptor} are wrapped as {@link + * UnsplittableRestrictionTracker} and will only return a non-null {@link + * SplitResult} for a checkpoint. + * *

TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting. * *

Checkpoint and Resume Processing

@@ -488,20 +495,21 @@ public double getSize( @NewTracker @RequiresNonNull({"latestOffsetEstimatorCache"}) - public OffsetRangeTracker restrictionTracker( + public UnsplittableRestrictionTracker restrictionTracker( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) { final LoadingCache latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; if (restriction.getTo() < Long.MAX_VALUE) { - return new OffsetRangeTracker(restriction); + return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction)); } // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, // so we want to minimize the amount of connections that we start and track with Kafka. Another // point is that it has a memoized backlog, and this should make that more reusable estimations. - return new GrowableOffsetRangeTracker( - restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)); + return new UnsplittableRestrictionTracker<>( + new GrowableOffsetRangeTracker( + restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor))); } @ProcessElement From 50007352676de4f6aeaf033a8aa516bf150d0632 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 12:54:23 +0000 Subject: [PATCH 2/9] Fix doc link --- .../java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 461a40164884..3459447660de 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -113,7 +113,7 @@ * may arbitrarily overwrite a consumer group's committed offset for a {@link TopicPartition}. * Restriction trackers for a {@link KafkaSourceDescriptor} are wrapped as {@link * UnsplittableRestrictionTracker} and will only return a non-null {@link - * SplitResult} for a checkpoint. + * org.apache.beam.sdk.transforms.splittabledofn.SplitResult} for a checkpoint. * *

TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting. * From 1b0e57f10ea2dc3cc3a7c50cf8fbc4ed5f53770a Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 14:04:10 +0000 Subject: [PATCH 3/9] Remove nullness suppression in UnsplittableRestrictionTracker --- .../splittabledofn/UnsplittableRestrictionTracker.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java index f6c0e95371e1..f541be411ade 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms.splittabledofn; +import org.checkerframework.checker.nullness.qual.Nullable; + /** * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable * restrictions. @@ -24,9 +26,6 @@ *

A restriction is considered unsplittable when restrictions of an element must not be processed * simultaneously (e.g., Kafka topic partition). */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class UnsplittableRestrictionTracker extends RestrictionTracker implements RestrictionTracker.HasProgress { private final RestrictionTracker tracker; @@ -46,7 +45,7 @@ public RestrictionT currentRestriction() { } @Override - public SplitResult trySplit(double fractionOfRemainder) { + public @Nullable SplitResult trySplit(double fractionOfRemainder) { return fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); } From d73b188e81753ec0b965ae02dea63d877d475803 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 14:06:20 +0000 Subject: [PATCH 4/9] Delegate trySplit when fractionOfRemainder == 0 in UnsplittableRestrictionTracker --- .../splittabledofn/UnsplittableRestrictionTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java index f541be411ade..a8cd5713252b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java @@ -46,7 +46,7 @@ public RestrictionT currentRestriction() { @Override public @Nullable SplitResult trySplit(double fractionOfRemainder) { - return fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); + return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); } @Override From 18b947bb095fe36fc3d78a91da34394ec05ddde6 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 14:21:16 +0000 Subject: [PATCH 5/9] Clarify splitting behavior in docs --- .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 3459447660de..cb0d3db1dae4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -109,11 +109,13 @@ * *

Splitting

* - *

Consumer groups must not consume from the same {@link TopicPartition} simultaneously. Doing so - * may arbitrarily overwrite a consumer group's committed offset for a {@link TopicPartition}. - * Restriction trackers for a {@link KafkaSourceDescriptor} are wrapped as {@link - * UnsplittableRestrictionTracker} and will only return a non-null {@link - * org.apache.beam.sdk.transforms.splittabledofn.SplitResult} for a checkpoint. + *

Consumer group members must not consume from the same {@link TopicPartition} simultaneously + * when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite a consumer group's + * committed offset for a {@link TopicPartition}. Restriction trackers for a {@link + * KafkaSourceDescriptor} are wrapped as {@link UnsplittableRestrictionTracker} + * and will only return a non-null {@link org.apache.beam.sdk.transforms.splittabledofn.SplitResult} + * for a checkpoint. This ensures consistent behavior when {@code enable.auto.commit} is set and + * prevents concurrent use of per-{@TopicPartition} cached {@link Consumer} resources. * *

TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting. * From 66809e5efe856f89f84d752c64f68f10831ff6f4 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 15:12:55 +0000 Subject: [PATCH 6/9] Allow splitting on fractionOfRemainder == 0 in UnsplittableRestrictionTracker --- .../splittabledofn/UnsplittableRestrictionTracker.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java index a8cd5713252b..e09ebfba37fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java @@ -46,7 +46,9 @@ public RestrictionT currentRestriction() { @Override public @Nullable SplitResult trySplit(double fractionOfRemainder) { - return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); + return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0 + ? null + : tracker.trySplit(fractionOfRemainder); } @Override From 5b502fff646e76eecd2d71ff142a952197873a6e Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 15:45:47 +0000 Subject: [PATCH 7/9] Add bounded read test for KafkaIO SDF --- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 0e8cbd2183ca..6fa51d1be3f5 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -813,6 +813,38 @@ public void testKafkaWithDelayedStopReadingFunction() { runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", sourceOptions.numRecords); } + @Test + public void testKafkaWithStopReadTime() throws IOException { + writePipeline + .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))) + .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) + .apply( + "Write to Kafka", + writeToKafka().withTopic(options.getKafkaTopic() + "-stop-read-time")); + + PipelineResult writeResult = writePipeline.run(); + PipelineResult.State writeState = writeResult.waitUntilFinish(); + cancelIfTimeouted(writeResult, writeState); + assertNotEquals(PipelineResult.State.FAILED, writeState); + + sdfReadPipeline.getOptions().as(Options.class).setStreaming(false); + PCollection> rows = + sdfReadPipeline.apply( + "Read from bounded Kafka", + readFromKafka() + .withTopic(options.getKafkaTopic() + "-stop-read-time") + .withStopReadTime( + org.joda.time.Instant.ofEpochMilli( + new MetricsReader(writeResult, NAMESPACE) + .getEndTimeMetric(WRITE_TIME_METRIC_NAME)))); + + PipelineResult readResult = sdfReadPipeline.run(); + PipelineResult.State readState = + readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); + cancelIfTimeouted(readResult, readState); + assertNotEquals(PipelineResult.State.FAILED, readState); + } + public static final Schema KAFKA_TOPIC_SCHEMA = Schema.builder() .addStringField("name") From 40a63f81cc727ec83649ea6e793577293d786b26 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Fri, 28 Nov 2025 16:51:40 +0000 Subject: [PATCH 8/9] Remove cancelIfTimeouted on writePipeline --- .../src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 6fa51d1be3f5..b1133eadb1cb 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -824,7 +824,6 @@ public void testKafkaWithStopReadTime() throws IOException { PipelineResult writeResult = writePipeline.run(); PipelineResult.State writeState = writeResult.waitUntilFinish(); - cancelIfTimeouted(writeResult, writeState); assertNotEquals(PipelineResult.State.FAILED, writeState); sdfReadPipeline.getOptions().as(Options.class).setStreaming(false); From f0eeec06b2e86bed802f55d36809116632b83c4a Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Mon, 1 Dec 2025 12:14:48 +0000 Subject: [PATCH 9/9] Remove the use of "consistent" in new documentation --- .../java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index cb0d3db1dae4..a05abba06e75 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -114,8 +114,9 @@ * committed offset for a {@link TopicPartition}. Restriction trackers for a {@link * KafkaSourceDescriptor} are wrapped as {@link UnsplittableRestrictionTracker} * and will only return a non-null {@link org.apache.beam.sdk.transforms.splittabledofn.SplitResult} - * for a checkpoint. This ensures consistent behavior when {@code enable.auto.commit} is set and - * prevents concurrent use of per-{@TopicPartition} cached {@link Consumer} resources. + * for a checkpoint. To the extent possible in the SDK, this reduces the risk of overwriting + * committed offsets when {@code enable.auto.commit} is set and prevents concurrent use of + * per-{@TopicPartition} cached {@link Consumer} resources. * *

TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting. *