diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
index 97b0d9b8e787..75c25118c39a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
@@ -30,7 +30,7 @@
* used as the end of the range to indicate infinity.
*
*
An offset range is considered growable when the end offset could grow (or change) during
- * execution time (e.g., Kafka topic partition offset, appended file, ...).
+ * execution time (e.g., appended file, ...).
*
*
The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
*/
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
new file mode 100644
index 000000000000..e09ebfba37fd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable
+ * restrictions.
+ *
+ *
A restriction is considered unsplittable when restrictions of an element must not be processed
+ * simultaneously (e.g., Kafka topic partition).
+ */
+public class UnsplittableRestrictionTracker
+ extends RestrictionTracker implements RestrictionTracker.HasProgress {
+ private final RestrictionTracker tracker;
+
+ public UnsplittableRestrictionTracker(RestrictionTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public boolean tryClaim(PositionT position) {
+ return tracker.tryClaim(position);
+ }
+
+ @Override
+ public RestrictionT currentRestriction() {
+ return tracker.currentRestriction();
+ }
+
+ @Override
+ public @Nullable SplitResult trySplit(double fractionOfRemainder) {
+ return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0
+ ? null
+ : tracker.trySplit(fractionOfRemainder);
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {
+ tracker.checkDone();
+ }
+
+ @Override
+ public IsBounded isBounded() {
+ return tracker.isBounded();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return tracker instanceof RestrictionTracker.HasProgress
+ ? ((RestrictionTracker.HasProgress) tracker).getProgress()
+ : Progress.NONE;
+ }
+}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 60fc9d57a626..a05abba06e75 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -108,6 +109,15 @@
*
* Splitting
*
+ * Consumer group members must not consume from the same {@link TopicPartition} simultaneously
+ * when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite a consumer group's
+ * committed offset for a {@link TopicPartition}. Restriction trackers for a {@link
+ * KafkaSourceDescriptor} are wrapped as {@link UnsplittableRestrictionTracker}
+ * and will only return a non-null {@link org.apache.beam.sdk.transforms.splittabledofn.SplitResult}
+ * for a checkpoint. To the extent possible in the SDK, this reduces the risk of overwriting
+ * committed offsets when {@code enable.auto.commit} is set and prevents concurrent use of
+ * per-{@TopicPartition} cached {@link Consumer} resources.
+ *
* TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting.
*
*
Checkpoint and Resume Processing
@@ -488,20 +498,21 @@ public double getSize(
@NewTracker
@RequiresNonNull({"latestOffsetEstimatorCache"})
- public OffsetRangeTracker restrictionTracker(
+ public UnsplittableRestrictionTracker restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
final LoadingCache
latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;
if (restriction.getTo() < Long.MAX_VALUE) {
- return new OffsetRangeTracker(restriction);
+ return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction));
}
// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
- return new GrowableOffsetRangeTracker(
- restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
+ return new UnsplittableRestrictionTracker<>(
+ new GrowableOffsetRangeTracker(
+ restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
}
@ProcessElement
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 0e8cbd2183ca..b1133eadb1cb 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -813,6 +813,37 @@ public void testKafkaWithDelayedStopReadingFunction() {
runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", sourceOptions.numRecords);
}
+ @Test
+ public void testKafkaWithStopReadTime() throws IOException {
+ writePipeline
+ .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
+ .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
+ .apply(
+ "Write to Kafka",
+ writeToKafka().withTopic(options.getKafkaTopic() + "-stop-read-time"));
+
+ PipelineResult writeResult = writePipeline.run();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
+ assertNotEquals(PipelineResult.State.FAILED, writeState);
+
+ sdfReadPipeline.getOptions().as(Options.class).setStreaming(false);
+ PCollection> rows =
+ sdfReadPipeline.apply(
+ "Read from bounded Kafka",
+ readFromKafka()
+ .withTopic(options.getKafkaTopic() + "-stop-read-time")
+ .withStopReadTime(
+ org.joda.time.Instant.ofEpochMilli(
+ new MetricsReader(writeResult, NAMESPACE)
+ .getEndTimeMetric(WRITE_TIME_METRIC_NAME))));
+
+ PipelineResult readResult = sdfReadPipeline.run();
+ PipelineResult.State readState =
+ readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+ cancelIfTimeouted(readResult, readState);
+ assertNotEquals(PipelineResult.State.FAILED, readState);
+ }
+
public static final Schema KAFKA_TOPIC_SCHEMA =
Schema.builder()
.addStringField("name")