Skip to content

Commit 154b2d1

Browse files
committed
Kafka source offset-based deduplication.
1 parent 3cb1440 commit 154b2d1

File tree

12 files changed

+225
-33
lines changed

12 files changed

+225
-33
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder<?> target, String message, Iterable
198198
}
199199
}
200200

201-
public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value) throws Exception {
201+
public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value)
202+
throws Exception {
202203
return target.getEncodedElementByteSize(value);
203204
}
204205
/**

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.kafka;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
21+
2022
import java.io.Serializable;
2123
import java.util.List;
2224
import java.util.Optional;
@@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
4244
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4345
private KafkaCheckpointMark() {} // for Avro
4446

47+
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
48+
4549
public KafkaCheckpointMark(
4650
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
4751
this.partitions = partitions;
@@ -66,6 +70,23 @@ public String toString() {
6670
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
6771
}
6872

73+
@Override
74+
public byte[] getOffsetLimit() {
75+
if (!reader.isPresent()) {
76+
throw new RuntimeException(
77+
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
78+
}
79+
if (!reader.get().offsetBasedDeduplicationSupported()) {
80+
throw new RuntimeException(
81+
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
82+
}
83+
84+
// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
85+
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
86+
PartitionMark partition = partitions.get(/* index= */ 0);
87+
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
88+
}
89+
6990
/**
7091
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
7192
* partition.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
610610
.setRedistributed(false)
611611
.setAllowDuplicates(false)
612612
.setRedistributeNumKeys(0)
613+
.setOffsetDeduplication(false)
613614
.build();
614615
}
615616

@@ -717,6 +718,9 @@ public abstract static class Read<K, V>
717718
@Pure
718719
public abstract int getRedistributeNumKeys();
719720

721+
@Pure
722+
public abstract @Nullable Boolean getOffsetDeduplication();
723+
720724
@Pure
721725
public abstract @Nullable Duration getWatchTopicPartitionDuration();
722726

@@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
782786

783787
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
784788

789+
abstract Builder<K, V> setOffsetDeduplication(Boolean offsetDeduplication);
790+
785791
abstract Builder<K, V> setTimestampPolicyFactory(
786792
TimestampPolicyFactory<K, V> timestampPolicyFactory);
787793

@@ -886,11 +892,16 @@ static <K, V> void setupExternalBuilder(
886892
if (config.allowDuplicates != null) {
887893
builder.setAllowDuplicates(config.allowDuplicates);
888894
}
889-
895+
if (config.redistribute
896+
&& (config.allowDuplicates == null || !config.allowDuplicates)
897+
&& config.offsetDeduplication != null) {
898+
builder.setOffsetDeduplication(config.offsetDeduplication);
899+
}
890900
} else {
891901
builder.setRedistributed(false);
892902
builder.setRedistributeNumKeys(0);
893903
builder.setAllowDuplicates(false);
904+
builder.setOffsetDeduplication(false);
894905
}
895906
}
896907

@@ -959,6 +970,7 @@ public static class Configuration {
959970
private Integer redistributeNumKeys;
960971
private Boolean redistribute;
961972
private Boolean allowDuplicates;
973+
private Boolean offsetDeduplication;
962974

963975
public void setConsumerConfig(Map<String, String> consumerConfig) {
964976
this.consumerConfig = consumerConfig;
@@ -1015,6 +1027,10 @@ public void setRedistribute(Boolean redistribute) {
10151027
public void setAllowDuplicates(Boolean allowDuplicates) {
10161028
this.allowDuplicates = allowDuplicates;
10171029
}
1030+
1031+
public void setOffsetDeduplication(Boolean offsetDeduplication) {
1032+
this.offsetDeduplication = offsetDeduplication;
1033+
}
10181034
}
10191035
}
10201036

@@ -1066,26 +1082,21 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
10661082
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
10671083
*/
10681084
public Read<K, V> withRedistribute() {
1069-
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
1070-
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
1071-
}
10721085
return toBuilder().setRedistributed(true).build();
10731086
}
10741087

10751088
public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
1076-
if (!isAllowDuplicates()) {
1077-
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
1078-
}
10791089
return toBuilder().setAllowDuplicates(allowDuplicates).build();
10801090
}
10811091

10821092
public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
1083-
checkState(
1084-
isRedistributed(),
1085-
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
10861093
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
10871094
}
10881095

1096+
public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
1097+
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
1098+
}
1099+
10891100
/**
10901101
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10911102
* from each of the matching topics are read.
@@ -1541,6 +1552,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
15411552
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
15421553
}
15431554
}
1555+
1556+
checkRedistributeConfiguration();
1557+
15441558
warnAboutUnsafeConfigurations(input);
15451559

15461560
// Infer key/value coders if not specified explicitly
@@ -1573,6 +1587,27 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
15731587
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
15741588
}
15751589

1590+
private void checkRedistributeConfiguration() {
1591+
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
1592+
LOG.warn(
1593+
"withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
1594+
}
1595+
if (isAllowDuplicates()) {
1596+
checkState(
1597+
isRedistributed(), "withAllowDuplicates without withRedistribute will have no effect.");
1598+
}
1599+
if (getRedistributeNumKeys() > 0) {
1600+
checkState(
1601+
isRedistributed(),
1602+
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
1603+
}
1604+
if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
1605+
checkState(
1606+
isRedistributed() && !isAllowDuplicates(),
1607+
"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
1608+
}
1609+
}
1610+
15761611
private void warnAboutUnsafeConfigurations(PBegin input) {
15771612
Long checkpointingInterval =
15781613
input
@@ -1776,6 +1811,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
17761811
if (kafkaRead.getRedistributeNumKeys() > 0) {
17771812
readTransform = readTransform.withRedistributeNumKeys(kafkaRead.getRedistributeNumKeys());
17781813
}
1814+
if (kafkaRead.getOffsetDeduplication() != null) {
1815+
readTransform = readTransform.withOffsetDeduplication();
1816+
}
17791817
PCollection<KafkaSourceDescriptor> output;
17801818
if (kafkaRead.isDynamicRead()) {
17811819
Set<String> topics = new HashSet<>();
@@ -2221,6 +2259,9 @@ public abstract static class ReadSourceDescriptors<K, V>
22212259
@Pure
22222260
abstract int getRedistributeNumKeys();
22232261

2262+
@Pure
2263+
abstract @Nullable Boolean getOffsetDeduplication();
2264+
22242265
@Pure
22252266
abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
22262267

@@ -2293,6 +2334,9 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
22932334

22942335
abstract ReadSourceDescriptors.Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
22952336

2337+
abstract ReadSourceDescriptors.Builder<K, V> setOffsetDeduplication(
2338+
Boolean offsetDeduplication);
2339+
22962340
abstract ReadSourceDescriptors<K, V> build();
22972341
}
22982342

@@ -2308,6 +2352,7 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
23082352
.setRedistribute(false)
23092353
.setAllowDuplicates(false)
23102354
.setRedistributeNumKeys(0)
2355+
.setOffsetDeduplication(false)
23112356
.build()
23122357
.withProcessingTime()
23132358
.withMonotonicallyIncreasingWatermarkEstimator();
@@ -2480,6 +2525,10 @@ public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int redistributeNumKe
24802525
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
24812526
}
24822527

2528+
public ReadSourceDescriptors<K, V> withOffsetDeduplication() {
2529+
return toBuilder().setOffsetDeduplication(true).build();
2530+
}
2531+
24832532
/** Use the creation time of {@link KafkaRecord} as the output timestamp. */
24842533
public ReadSourceDescriptors<K, V> withCreateTime() {
24852534
return withExtractOutputTimestampFn(

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ Object getDefaultValue() {
137137
return false;
138138
}
139139
},
140+
OFFSET_DEDUPLICATION(LEGACY),
140141
;
141142

142143
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2121

22+
import java.nio.charset.StandardCharsets;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425
import java.util.Random;
2526
import org.apache.beam.sdk.transforms.SerializableFunction;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
28+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
2729
import org.apache.kafka.clients.consumer.Consumer;
2830
import org.apache.kafka.clients.consumer.ConsumerConfig;
2931
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -142,4 +144,15 @@ void update(double quantity) {
142144
return avg;
143145
}
144146
}
147+
148+
static final class OffsetBasedDeduplication {
149+
150+
static byte[] encodeOffset(long offset) {
151+
return Longs.toByteArray(offset);
152+
}
153+
154+
static byte[] getUniqueId(String topic, int partition, long offset) {
155+
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
156+
}
157+
}
145158
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.kafka.common.TopicPartition;
6767
import org.apache.kafka.common.errors.WakeupException;
6868
import org.apache.kafka.common.serialization.Deserializer;
69+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
6970
import org.checkerframework.checker.nullness.qual.Nullable;
7071
import org.joda.time.Duration;
7172
import org.joda.time.Instant;
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
299300
return curTimestamp;
300301
}
301302

303+
private static final byte[] EMPTY_RECORD_ID = new byte[0];
304+
305+
@Override
306+
public byte[] getCurrentRecordId() throws NoSuchElementException {
307+
if (!this.offsetBasedDeduplicationSupported) {
308+
// BoundedReadFromUnboundedSource and tests may call getCurrentRecordId(), even for non offset
309+
// deduplication cases. Therefore, Kafka reader cannot produce an exception when offset
310+
// deduplication is disabled. Instead an empty record ID is provided.
311+
return EMPTY_RECORD_ID;
312+
}
313+
if (curRecord != null) {
314+
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
315+
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
316+
}
317+
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
318+
}
319+
320+
@Override
321+
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
322+
if (!this.offsetBasedDeduplicationSupported) {
323+
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
324+
}
325+
if (curRecord != null) {
326+
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
327+
}
328+
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
329+
}
330+
302331
@Override
303332
public long getSplitBacklogBytes() {
304333
long backlogBytes = 0;
@@ -314,6 +343,10 @@ public long getSplitBacklogBytes() {
314343
return backlogBytes;
315344
}
316345

346+
public boolean offsetBasedDeduplicationSupported() {
347+
return this.offsetBasedDeduplicationSupported;
348+
}
349+
317350
////////////////////////////////////////////////////////////////////////////////////////////////
318351

319352
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -332,10 +365,12 @@ public long getSplitBacklogBytes() {
332365
private final String name;
333366
private @Nullable Consumer<byte[], byte[]> consumer = null;
334367
private final List<PartitionState<K, V>> partitionStates;
335-
private @Nullable KafkaRecord<K, V> curRecord = null;
336-
private @Nullable Instant curTimestamp = null;
368+
private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
369+
private @MonotonicNonNull Instant curTimestamp = null;
337370
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
338371

372+
private final boolean offsetBasedDeduplicationSupported;
373+
339374
private @Nullable Deserializer<K> keyDeserializerInstance = null;
340375
private @Nullable Deserializer<V> valueDeserializerInstance = null;
341376

@@ -507,6 +542,7 @@ Instant updateAndGetWatermark() {
507542
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
508543
this.source = source;
509544
this.name = "Reader-" + source.getId();
545+
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();
510546

511547
List<TopicPartition> partitions =
512548
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
113113
partitions.size() > 0,
114114
"Could not find any partitions. Please check Kafka configuration and topic names");
115115

116-
int numSplits = Math.min(desiredNumSplits, partitions.size());
117-
// XXX make all splits have the same # of partitions
118-
while (partitions.size() % numSplits > 0) {
119-
++numSplits;
116+
int numSplits;
117+
if (offsetBasedDeduplicationSupported()) {
118+
// Enforce 1:1 split to partition ratio for offset deduplication.
119+
numSplits = partitions.size();
120+
LOG.info(
121+
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
122+
+ "Forcing the number of splits to equal the number of total partitions: {}.",
123+
numSplits);
124+
} else {
125+
numSplits = Math.min(desiredNumSplits, partitions.size());
126+
// Make all splits have the same # of partitions.
127+
while (partitions.size() % numSplits > 0) {
128+
++numSplits;
129+
}
120130
}
121131
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
122132

@@ -177,6 +187,11 @@ public boolean requiresDeduping() {
177187
return false;
178188
}
179189

190+
@Override
191+
public boolean offsetBasedDeduplicationSupported() {
192+
return spec.getOffsetDeduplication() != null && spec.getOffsetDeduplication();
193+
}
194+
180195
@Override
181196
public Coder<KafkaRecord<K, V>> getOutputCoder() {
182197
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ public void testConstructKafkaRead() throws Exception {
145145
expansionService.expand(request, observer);
146146
ExpansionApi.ExpansionResponse result = observer.result;
147147
RunnerApi.PTransform transform = result.getTransform();
148-
System.out.println("xxx : " + result.toString());
149148
assertThat(
150149
transform.getSubtransformsList(),
151150
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));

0 commit comments

Comments
 (0)