Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* used as the end of the range to indicate infinity.
*
* <p>An offset range is considered growable when the end offset could grow (or change) during
* execution time (e.g., Kafka topic partition offset, appended file, ...).
* execution time (e.g., appended file, ...).
*
* <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.splittabledofn;

import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable
* restrictions.
*
* <p>A restriction is considered unsplittable when restrictions of an element must not be processed
* simultaneously (e.g., Kafka topic partition).
*/
public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
extends RestrictionTracker<RestrictionT, PositionT> implements RestrictionTracker.HasProgress {
private final RestrictionTracker<RestrictionT, PositionT> tracker;

public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, PositionT> tracker) {
this.tracker = tracker;
}

@Override
public boolean tryClaim(PositionT position) {
return tracker.tryClaim(position);
}

@Override
public RestrictionT currentRestriction() {
return tracker.currentRestriction();
}

@Override
public @Nullable SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0
? null
: tracker.trySplit(fractionOfRemainder);
}

@Override
public void checkDone() throws IllegalStateException {
tracker.checkDone();
}

@Override
public IsBounded isBounded() {
return tracker.isBounded();
}

@Override
public Progress getProgress() {
return tracker instanceof RestrictionTracker.HasProgress
? ((RestrictionTracker.HasProgress) tracker).getProgress()
: Progress.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -108,6 +109,15 @@
*
* <h4>Splitting</h4>
*
* <p>Consumer group members must not consume from the same {@link TopicPartition} simultaneously
* when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite a consumer group's
* committed offset for a {@link TopicPartition}. Restriction trackers for a {@link
* KafkaSourceDescriptor} are wrapped as {@link UnsplittableRestrictionTracker<OffsetRange, Long>}
* and will only return a non-null {@link org.apache.beam.sdk.transforms.splittabledofn.SplitResult}
* for a checkpoint. To the extent possible in the SDK, this reduces the risk of overwriting
* committed offsets when {@code enable.auto.commit} is set and prevents concurrent use of
* per-{@TopicPartition} cached {@link Consumer} resources.
*
* <p>TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting.
*
* <h4>Checkpoint and Resume Processing</h4>
Expand Down Expand Up @@ -488,20 +498,21 @@ public double getSize(

@NewTracker
@RequiresNonNull({"latestOffsetEstimatorCache"})
public OffsetRangeTracker restrictionTracker(
public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;

if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction));
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
return new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
return new UnsplittableRestrictionTracker<>(
new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
}

@ProcessElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,37 @@ public void testKafkaWithDelayedStopReadingFunction() {
runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", sourceOptions.numRecords);
}

@Test
public void testKafkaWithStopReadTime() throws IOException {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
.apply(
"Write to Kafka",
writeToKafka().withTopic(options.getKafkaTopic() + "-stop-read-time"));

PipelineResult writeResult = writePipeline.run();
PipelineResult.State writeState = writeResult.waitUntilFinish();
assertNotEquals(PipelineResult.State.FAILED, writeState);

sdfReadPipeline.getOptions().as(Options.class).setStreaming(false);
PCollection<KafkaRecord<byte[], byte[]>> rows =
sdfReadPipeline.apply(
"Read from bounded Kafka",
readFromKafka()
.withTopic(options.getKafkaTopic() + "-stop-read-time")
.withStopReadTime(
org.joda.time.Instant.ofEpochMilli(
new MetricsReader(writeResult, NAMESPACE)
.getEndTimeMetric(WRITE_TIME_METRIC_NAME))));

PipelineResult readResult = sdfReadPipeline.run();
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfTimeouted(readResult, readState);
assertNotEquals(PipelineResult.State.FAILED, readState);
}

public static final Schema KAFKA_TOPIC_SCHEMA =
Schema.builder()
.addStringField("name")
Expand Down
Loading