Skip to content

Commit aeb6c4b

Browse files
tomsteppVardhanThigle
authored andcommitted
Kafka source offset-based deduplication. (apache#33596)
1 parent 8b93ce2 commit aeb6c4b

File tree

11 files changed

+200
-32
lines changed

11 files changed

+200
-32
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.kafka;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
21+
2022
import java.io.Serializable;
2123
import java.util.List;
2224
import java.util.Optional;
@@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
4244
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4345
private KafkaCheckpointMark() {} // for Avro
4446

47+
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
48+
4549
public KafkaCheckpointMark(
4650
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
4751
this.partitions = partitions;
@@ -66,6 +70,23 @@ public String toString() {
6670
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
6771
}
6872

73+
@Override
74+
public byte[] getOffsetLimit() {
75+
if (!reader.isPresent()) {
76+
throw new RuntimeException(
77+
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
78+
}
79+
if (!reader.get().offsetBasedDeduplicationSupported()) {
80+
throw new RuntimeException(
81+
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
82+
}
83+
84+
// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
85+
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
86+
PartitionMark partition = partitions.get(/* index= */ 0);
87+
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
88+
}
89+
6990
/**
7091
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
7192
* partition.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,9 @@ public abstract static class Read<K, V>
717717
@Pure
718718
public abstract int getRedistributeNumKeys();
719719

720+
@Pure
721+
public abstract @Nullable Boolean getOffsetDeduplication();
722+
720723
@Pure
721724
public abstract @Nullable Duration getWatchTopicPartitionDuration();
722725

@@ -782,6 +785,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
782785

783786
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
784787

788+
abstract Builder<K, V> setOffsetDeduplication(Boolean offsetDeduplication);
789+
785790
abstract Builder<K, V> setTimestampPolicyFactory(
786791
TimestampPolicyFactory<K, V> timestampPolicyFactory);
787792

@@ -886,11 +891,16 @@ static <K, V> void setupExternalBuilder(
886891
if (config.allowDuplicates != null) {
887892
builder.setAllowDuplicates(config.allowDuplicates);
888893
}
889-
894+
if (config.redistribute
895+
&& (config.allowDuplicates == null || !config.allowDuplicates)
896+
&& config.offsetDeduplication != null) {
897+
builder.setOffsetDeduplication(config.offsetDeduplication);
898+
}
890899
} else {
891900
builder.setRedistributed(false);
892901
builder.setRedistributeNumKeys(0);
893902
builder.setAllowDuplicates(false);
903+
builder.setOffsetDeduplication(false);
894904
}
895905
}
896906

@@ -959,6 +969,7 @@ public static class Configuration {
959969
private Integer redistributeNumKeys;
960970
private Boolean redistribute;
961971
private Boolean allowDuplicates;
972+
private Boolean offsetDeduplication;
962973

963974
public void setConsumerConfig(Map<String, String> consumerConfig) {
964975
this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
10151026
public void setAllowDuplicates(Boolean allowDuplicates) {
10161027
this.allowDuplicates = allowDuplicates;
10171028
}
1029+
1030+
public void setOffsetDeduplication(Boolean offsetDeduplication) {
1031+
this.offsetDeduplication = offsetDeduplication;
1032+
}
10181033
}
10191034
}
10201035

@@ -1066,26 +1081,21 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
10661081
* Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
10671082
*/
10681083
public Read<K, V> withRedistribute() {
1069-
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
1070-
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
1071-
}
10721084
return toBuilder().setRedistributed(true).build();
10731085
}
10741086

10751087
public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
1076-
if (!isAllowDuplicates()) {
1077-
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
1078-
}
10791088
return toBuilder().setAllowDuplicates(allowDuplicates).build();
10801089
}
10811090

10821091
public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
1083-
checkState(
1084-
isRedistributed(),
1085-
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
10861092
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
10871093
}
10881094

1095+
public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
1096+
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
1097+
}
1098+
10891099
/**
10901100
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10911101
* from each of the matching topics are read.
@@ -1541,6 +1551,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
15411551
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
15421552
}
15431553
}
1554+
1555+
checkRedistributeConfiguration();
1556+
15441557
warnAboutUnsafeConfigurations(input);
15451558

15461559
// Infer key/value coders if not specified explicitly
@@ -1573,6 +1586,26 @@ && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
15731586
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
15741587
}
15751588

1589+
private void checkRedistributeConfiguration() {
1590+
if (getRedistributeNumKeys() == 0 && isRedistributed()) {
1591+
LOG.warn(
1592+
"withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
1593+
}
1594+
if (isAllowDuplicates()) {
1595+
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
1596+
}
1597+
if (getRedistributeNumKeys() > 0) {
1598+
checkState(
1599+
isRedistributed(),
1600+
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
1601+
}
1602+
if (getOffsetDeduplication() != null && getOffsetDeduplication()) {
1603+
checkState(
1604+
isRedistributed() && !isAllowDuplicates(),
1605+
"withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
1606+
}
1607+
}
1608+
15761609
private void warnAboutUnsafeConfigurations(PBegin input) {
15771610
Long checkpointingInterval =
15781611
input

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ Object getDefaultValue() {
137137
return false;
138138
}
139139
},
140+
OFFSET_DEDUPLICATION(LEGACY),
140141
;
141142

142143
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2121

22+
import java.nio.charset.StandardCharsets;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425
import java.util.Random;
2526
import org.apache.beam.sdk.transforms.SerializableFunction;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
28+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
2729
import org.apache.kafka.clients.consumer.Consumer;
2830
import org.apache.kafka.clients.consumer.ConsumerConfig;
2931
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -142,4 +144,15 @@ void update(double quantity) {
142144
return avg;
143145
}
144146
}
147+
148+
static final class OffsetBasedDeduplication {
149+
150+
static byte[] encodeOffset(long offset) {
151+
return Longs.toByteArray(offset);
152+
}
153+
154+
static byte[] getUniqueId(String topic, int partition, long offset) {
155+
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
156+
}
157+
}
145158
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.kafka.common.TopicPartition;
6767
import org.apache.kafka.common.errors.WakeupException;
6868
import org.apache.kafka.common.serialization.Deserializer;
69+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
6970
import org.checkerframework.checker.nullness.qual.Nullable;
7071
import org.joda.time.Duration;
7172
import org.joda.time.Instant;
@@ -299,6 +300,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
299300
return curTimestamp;
300301
}
301302

303+
@Override
304+
public byte[] getCurrentRecordId() throws NoSuchElementException {
305+
if (!offsetBasedDeduplicationSupported()) {
306+
// Defer result to super if offset deduplication is not supported.
307+
return super.getCurrentRecordId();
308+
}
309+
if (curRecord == null) {
310+
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
311+
}
312+
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
313+
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
314+
}
315+
316+
@Override
317+
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
318+
if (!offsetBasedDeduplicationSupported()) {
319+
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
320+
}
321+
if (curRecord == null) {
322+
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
323+
}
324+
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
325+
}
326+
302327
@Override
303328
public long getSplitBacklogBytes() {
304329
long backlogBytes = 0;
@@ -313,6 +338,10 @@ public long getSplitBacklogBytes() {
313338
return backlogBytes;
314339
}
315340

341+
public boolean offsetBasedDeduplicationSupported() {
342+
return source.offsetBasedDeduplicationSupported();
343+
}
344+
316345
////////////////////////////////////////////////////////////////////////////////////////////////
317346

318347
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -331,8 +360,8 @@ public long getSplitBacklogBytes() {
331360
private final String name;
332361
private @Nullable Consumer<byte[], byte[]> consumer = null;
333362
private final List<PartitionState<K, V>> partitionStates;
334-
private @Nullable KafkaRecord<K, V> curRecord = null;
335-
private @Nullable Instant curTimestamp = null;
363+
private @MonotonicNonNull KafkaRecord<K, V> curRecord = null;
364+
private @MonotonicNonNull Instant curTimestamp = null;
336365
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
337366

338367
private @Nullable Deserializer<K> keyDeserializerInstance = null;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
113113
partitions.size() > 0,
114114
"Could not find any partitions. Please check Kafka configuration and topic names");
115115

116-
int numSplits = Math.min(desiredNumSplits, partitions.size());
117-
// XXX make all splits have the same # of partitions
118-
while (partitions.size() % numSplits > 0) {
119-
++numSplits;
116+
int numSplits;
117+
if (offsetBasedDeduplicationSupported()) {
118+
// Enforce 1:1 split to partition ratio for offset deduplication.
119+
numSplits = partitions.size();
120+
LOG.info(
121+
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
122+
+ "Forcing the number of splits to equal the number of total partitions: {}.",
123+
numSplits);
124+
} else {
125+
numSplits = Math.min(desiredNumSplits, partitions.size());
126+
// Make all splits have the same # of partitions.
127+
while (partitions.size() % numSplits > 0) {
128+
++numSplits;
129+
}
120130
}
121131
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
122132

@@ -177,6 +187,11 @@ public boolean requiresDeduping() {
177187
return false;
178188
}
179189

190+
@Override
191+
public boolean offsetBasedDeduplicationSupported() {
192+
return spec.getOffsetDeduplication() != null && spec.getOffsetDeduplication();
193+
}
194+
180195
@Override
181196
public Coder<KafkaRecord<K, V>> getOutputCoder() {
182197
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ public void testConstructKafkaRead() throws Exception {
145145
expansionService.expand(request, observer);
146146
ExpansionApi.ExpansionResponse result = observer.result;
147147
RunnerApi.PTransform transform = result.getTransform();
148-
System.out.println("xxx : " + result.toString());
149148
assertThat(
150149
transform.getSubtransformsList(),
151150
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.kafka;
1919

2020
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
21+
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransformWithOffsetDedup;
2122
import static org.hamcrest.MatcherAssert.assertThat;
2223
import static org.hamcrest.Matchers.containsInAnyOrder;
2324
import static org.hamcrest.Matchers.empty;
@@ -114,7 +115,8 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie
114115
new ValueAsTimestampFn(),
115116
false, /*redistribute*/
116117
false, /*allowDuplicates*/
117-
0)));
118+
0, /*numKeys*/
119+
null /*offsetDeduplication*/)));
118120
return p.run();
119121
}
120122

@@ -139,6 +141,17 @@ public void testReadTransformCreationWithLegacyImplementationBoundProperty() {
139141
assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), containsInAnyOrder(expect));
140142
}
141143

144+
@Test
145+
public void testReadTransformCreationWithOffsetDeduplication() {
146+
p.apply(mkKafkaReadTransformWithOffsetDedup(1000, new ValueAsTimestampFn()));
147+
PipelineResult r = p.run();
148+
String[] expect =
149+
KafkaIOTest.mkKafkaTopics.stream()
150+
.map(topic -> String.format("kafka:`%s`.%s", KafkaIOTest.mkKafkaServers, topic))
151+
.toArray(String[]::new);
152+
assertThat(Lineage.query(r.metrics(), Lineage.Type.SOURCE), containsInAnyOrder(expect));
153+
}
154+
142155
@Test
143156
public void testReadTransformCreationWithSdfImplementationBoundProperty() {
144157
PipelineResult r =

0 commit comments

Comments
 (0)