Skip to content

Commit 84dbc7b

Browse files
committed
Kafka source offset-based deduplication.
1 parent ae77169 commit 84dbc7b

File tree

10 files changed

+129
-1142
lines changed

10 files changed

+129
-1142
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java

Lines changed: 0 additions & 646 deletions
This file was deleted.

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/OrderedCodeTest.java

Lines changed: 0 additions & 489 deletions
This file was deleted.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.kafka;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
21+
2022
import java.io.Serializable;
2123
import java.util.List;
2224
import java.util.Optional;
@@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
4244
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4345
private KafkaCheckpointMark() {} // for Avro
4446

47+
private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;
48+
4549
public KafkaCheckpointMark(
4650
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
4751
this.partitions = partitions;
@@ -66,6 +70,23 @@ public String toString() {
6670
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
6771
}
6872

73+
@Override
74+
public byte[] getOffsetLimit() {
75+
if (!reader.isPresent()) {
76+
throw new RuntimeException(
77+
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
78+
}
79+
if (!reader.get().offsetBasedDeduplicationSupported()) {
80+
throw new RuntimeException(
81+
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
82+
}
83+
84+
// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
85+
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
86+
PartitionMark partition = partitions.get(/* index= */ 0);
87+
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
88+
}
89+
6990
/**
7091
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
7192
* partition.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
610610
.setRedistributed(false)
611611
.setAllowDuplicates(false)
612612
.setRedistributeNumKeys(0)
613+
.setOffsetDeduplication(false)
613614
.build();
614615
}
615616

@@ -717,6 +718,9 @@ public abstract static class Read<K, V>
717718
@Pure
718719
public abstract int getRedistributeNumKeys();
719720

721+
@Pure
722+
public abstract boolean isOffsetDeduplication();
723+
720724
@Pure
721725
public abstract @Nullable Duration getWatchTopicPartitionDuration();
722726

@@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
782786

783787
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
784788

789+
abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);
790+
785791
abstract Builder<K, V> setTimestampPolicyFactory(
786792
TimestampPolicyFactory<K, V> timestampPolicyFactory);
787793

@@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
892898
builder.setRedistributeNumKeys(0);
893899
builder.setAllowDuplicates(false);
894900
}
901+
// TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
902+
if (config.offsetDeduplication != null) {
903+
builder.setOffsetDeduplication(config.offsetDeduplication);
904+
}
895905
}
896906

897907
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
@@ -959,6 +969,7 @@ public static class Configuration {
959969
private Integer redistributeNumKeys;
960970
private Boolean redistribute;
961971
private Boolean allowDuplicates;
972+
private Boolean offsetDeduplication;
962973

963974
public void setConsumerConfig(Map<String, String> consumerConfig) {
964975
this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
10151026
public void setAllowDuplicates(Boolean allowDuplicates) {
10161027
this.allowDuplicates = allowDuplicates;
10171028
}
1029+
1030+
public void setOffsetDeduplication(Boolean offsetDeduplication) {
1031+
this.offsetDeduplication = offsetDeduplication;
1032+
}
10181033
}
10191034
}
10201035

@@ -1086,6 +1101,10 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
10861101
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
10871102
}
10881103

1104+
public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
1105+
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
1106+
}
1107+
10891108
/**
10901109
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10911110
* from each of the matching topics are read.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ Object getDefaultValue() {
137137
return false;
138138
}
139139
},
140-
;
140+
OFFSET_DEDUPLICATION(LEGACY) {
141+
@Override
142+
Object getDefaultValue() {
143+
return false;
144+
}
145+
};
141146

142147
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
143148

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2121

22+
import java.nio.ByteBuffer;
23+
import java.nio.charset.StandardCharsets;
2224
import java.util.HashMap;
2325
import java.util.Map;
2426
import java.util.Random;
@@ -142,4 +144,17 @@ void update(double quantity) {
142144
return avg;
143145
}
144146
}
147+
148+
static final class OffsetBasedDeduplication {
149+
private static final ByteBuffer offsetBuffer = ByteBuffer.allocate(Long.BYTES);
150+
151+
static byte[] encodeOffset(long offset) {
152+
offsetBuffer.putLong(/* index= */ 0, offset);
153+
return offsetBuffer.array();
154+
}
155+
156+
static byte[] getUniqueId(String topic, int partition, long offset) {
157+
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
158+
}
159+
}
145160
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ public boolean advance() throws IOException {
214214
curTimestamp =
215215
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
216216
curRecord = record;
217+
if (this.offsetBasedDeduplicationSupported) {
218+
curOffset = KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(offset);
219+
curId =
220+
KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
221+
rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
222+
}
217223

218224
int recordSize =
219225
(rawRecord.key() == null ? 0 : rawRecord.key().length)
@@ -299,6 +305,28 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
299305
return curTimestamp;
300306
}
301307

308+
@Override
309+
public byte[] getCurrentRecordId() throws NoSuchElementException {
310+
if (!this.offsetBasedDeduplicationSupported) {
311+
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
312+
}
313+
if (curId == null) {
314+
throw new NoSuchElementException("KafkaUnboundedReader's curId is null.");
315+
}
316+
return curId;
317+
}
318+
319+
@Override
320+
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
321+
if (!this.offsetBasedDeduplicationSupported) {
322+
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
323+
}
324+
if (curOffset == null) {
325+
throw new NoSuchElementException("KafkaUnboundedReader's curOffset is null.");
326+
}
327+
return curOffset;
328+
}
329+
302330
@Override
303331
public long getSplitBacklogBytes() {
304332
long backlogBytes = 0;
@@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
314342
return backlogBytes;
315343
}
316344

345+
public boolean offsetBasedDeduplicationSupported() {
346+
return this.offsetBasedDeduplicationSupported;
347+
}
348+
317349
////////////////////////////////////////////////////////////////////////////////////////////////
318350

319351
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
336368
private @Nullable Instant curTimestamp = null;
337369
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
338370

371+
private final boolean offsetBasedDeduplicationSupported;
372+
private byte[] curOffset = new byte[0];
373+
private byte[] curId = new byte[0];
374+
339375
private @Nullable Deserializer<K> keyDeserializerInstance = null;
340376
private @Nullable Deserializer<V> valueDeserializerInstance = null;
341377

@@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
507543
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
508544
this.source = source;
509545
this.name = "Reader-" + source.getId();
546+
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();
510547

511548
List<TopicPartition> partitions =
512549
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
113113
partitions.size() > 0,
114114
"Could not find any partitions. Please check Kafka configuration and topic names");
115115

116-
int numSplits = Math.min(desiredNumSplits, partitions.size());
117-
// XXX make all splits have the same # of partitions
118-
while (partitions.size() % numSplits > 0) {
119-
++numSplits;
116+
int numSplits;
117+
if (offsetBasedDeduplicationSupported()) {
118+
// Enforce 1:1 split to partition ratio for offset deduplication.
119+
numSplits = partitions.size();
120+
LOG.info(
121+
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
122+
+ "Forcing the number of splits to equal the number of total partitions: {}.",
123+
numSplits);
124+
} else {
125+
numSplits = Math.min(desiredNumSplits, partitions.size());
126+
// Make all splits have the same # of partitions.
127+
while (partitions.size() % numSplits > 0) {
128+
++numSplits;
129+
}
120130
}
121131
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
122132

@@ -177,6 +187,11 @@ public boolean requiresDeduping() {
177187
return false;
178188
}
179189

190+
@Override
191+
public boolean offsetBasedDeduplicationSupported() {
192+
return spec.isOffsetDeduplication();
193+
}
194+
180195
@Override
181196
public Coder<KafkaRecord<K, V>> getOutputCoder() {
182197
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
111111
Field.of("consumer_polling_timeout", FieldType.INT64),
112112
Field.of("redistribute_num_keys", FieldType.INT32),
113113
Field.of("redistribute", FieldType.BOOLEAN),
114-
Field.of("allow_duplicates", FieldType.BOOLEAN)))
114+
Field.of("allow_duplicates", FieldType.BOOLEAN),
115+
Field.of("offset_deduplication", FieldType.BOOLEAN)))
115116
.withFieldValue("topics", topics)
116117
.withFieldValue("consumer_config", consumerConfig)
117118
.withFieldValue("key_deserializer", keyDeserializer)
@@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
123124
.withFieldValue("redistribute_num_keys", 0)
124125
.withFieldValue("redistribute", false)
125126
.withFieldValue("allow_duplicates", false)
127+
.withFieldValue("offset_deduplication", false)
126128
.build());
127129

128130
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
@@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
247249
Field.of("timestamp_policy", FieldType.STRING),
248250
Field.of("redistribute_num_keys", FieldType.INT32),
249251
Field.of("redistribute", FieldType.BOOLEAN),
250-
Field.of("allow_duplicates", FieldType.BOOLEAN)))
252+
Field.of("allow_duplicates", FieldType.BOOLEAN),
253+
Field.of("offset_deduplication", FieldType.BOOLEAN)))
251254
.withFieldValue("topics", topics)
252255
.withFieldValue("consumer_config", consumerConfig)
253256
.withFieldValue("key_deserializer", keyDeserializer)
@@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
258261
.withFieldValue("redistribute_num_keys", 0)
259262
.withFieldValue("redistribute", false)
260263
.withFieldValue("allow_duplicates", false)
264+
.withFieldValue("offset_deduplication", false)
261265
.build());
262266

263267
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();

sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
101101
.addBooleanField("redistribute")
102102
.addBooleanField("allows_duplicates")
103103
.addNullableInt32Field("redistribute_num_keys")
104+
.addBooleanField("offset_deduplication")
104105
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
105106
.addByteArrayField("timestamp_policy_factory")
106107
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
@@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
221222
fieldValues.put("redistribute", transform.isRedistributed());
222223
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
223224
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
225+
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
224226
return Row.withSchema(schema).withFieldValues(fieldValues).build();
225227
}
226228

@@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
349351
}
350352
}
351353
}
354+
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
355+
if (offsetDeduplication != null) {
356+
transform = transform.withOffsetDeduplication(offsetDeduplication);
357+
}
352358
Duration maxReadTime = configRow.getValue("max_read_time");
353359
if (maxReadTime != null) {
354360
transform =

0 commit comments

Comments
 (0)