Skip to content

Commit 0307c30

Browse files
committed
Kafka source offset-based deduplication.
1 parent d204f6a commit 0307c30

File tree

9 files changed

+99
-2
lines changed

9 files changed

+99
-2
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java renamed to sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.runners.dataflow.worker;
18+
package org.apache.beam.sdk.util;
1919

2020
import java.math.RoundingMode;
2121
import java.util.ArrayList;

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/OrderedCodeTest.java renamed to sdks/java/core/src/test/java/org/apache/beam/sdk/util/OrderedCodeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.runners.dataflow.worker;
18+
package org.apache.beam.sdk.util;
1919

2020
import static org.junit.Assert.assertArrayEquals;
2121
import static org.junit.Assert.assertEquals;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ public String toString() {
6666
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
6767
}
6868

69+
@Override
70+
public byte[] getOffsetLimit() {
71+
// Currently Kafka offset-based deduplication is only supported with a single Kafka partition
72+
// per Beam split. Enforce the number of Beam splits with:
73+
// "--desiredNumUnboundedSourceSplits=<NumKafkaPartitions>".
74+
if (reader.isPresent() && reader.get().offsetDeduplication()) {
75+
PartitionMark partition = partitions.get(0);
76+
return KafkaIOUtils.getOrderedCode(partition.getNextOffset());
77+
}
78+
return new byte[0];
79+
}
80+
6981
/**
7082
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
7183
* partition.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
610610
.setRedistributed(false)
611611
.setAllowDuplicates(false)
612612
.setRedistributeNumKeys(0)
613+
.setOffsetDeduplication(false)
613614
.build();
614615
}
615616

@@ -717,6 +718,9 @@ public abstract static class Read<K, V>
717718
@Pure
718719
public abstract int getRedistributeNumKeys();
719720

721+
@Pure
722+
public abstract boolean isOffsetDeduplication();
723+
720724
@Pure
721725
public abstract @Nullable Duration getWatchTopicPartitionDuration();
722726

@@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
782786

783787
abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);
784788

789+
abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);
790+
785791
abstract Builder<K, V> setTimestampPolicyFactory(
786792
TimestampPolicyFactory<K, V> timestampPolicyFactory);
787793

@@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
892898
builder.setRedistributeNumKeys(0);
893899
builder.setAllowDuplicates(false);
894900
}
901+
// TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
902+
if (config.offsetDeduplication != null) {
903+
builder.setOffsetDeduplication(config.offsetDeduplication);
904+
}
895905
}
896906

897907
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
@@ -959,6 +969,7 @@ public static class Configuration {
959969
private Integer redistributeNumKeys;
960970
private Boolean redistribute;
961971
private Boolean allowDuplicates;
972+
private Boolean offsetDeduplication;
962973

963974
public void setConsumerConfig(Map<String, String> consumerConfig) {
964975
this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
10151026
public void setAllowDuplicates(Boolean allowDuplicates) {
10161027
this.allowDuplicates = allowDuplicates;
10171028
}
1029+
1030+
public void setOffsetDeduplication(Boolean offsetDeduplication) {
1031+
this.offsetDeduplication = offsetDeduplication;
1032+
}
10181033
}
10191034
}
10201035

@@ -1086,6 +1101,10 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
10861101
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
10871102
}
10881103

1104+
public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
1105+
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
1106+
}
1107+
10891108
/**
10901109
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
10911110
* from each of the matching topics are read.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ Object getDefaultValue() {
137137
return false;
138138
}
139139
},
140+
OFFSET_DEDUPLICATION(LEGACY) {
141+
@Override
142+
Object getDefaultValue() {
143+
return false;
144+
}
145+
}
140146
;
141147

142148
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2121

22+
import java.nio.charset.StandardCharsets;
2223
import java.util.HashMap;
2324
import java.util.Map;
2425
import java.util.Random;
2526
import org.apache.beam.sdk.transforms.SerializableFunction;
27+
import org.apache.beam.sdk.util.OrderedCode;
2628
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2729
import org.apache.kafka.clients.consumer.Consumer;
2830
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -142,4 +144,14 @@ void update(double quantity) {
142144
return avg;
143145
}
144146
}
147+
148+
static byte[] getOrderedCode(long offset) {
149+
OrderedCode orderedCode = new OrderedCode();
150+
orderedCode.writeNumIncreasing(offset);
151+
return orderedCode.getEncodedBytes();
152+
}
153+
154+
static byte[] getUniqueId(String topic, int partition, long offset) {
155+
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
156+
}
145157
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ public boolean advance() throws IOException {
214214
curTimestamp =
215215
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
216216
curRecord = record;
217+
if (this.offsetDeduplication) {
218+
curOffset = KafkaIOUtils.getOrderedCode(offset);
219+
curId = KafkaIOUtils.getUniqueId(rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
220+
}
217221

218222
int recordSize =
219223
(rawRecord.key() == null ? 0 : rawRecord.key().length)
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
299303
return curTimestamp;
300304
}
301305

306+
@Override
307+
public byte[] getCurrentRecordId() throws NoSuchElementException {
308+
if (curId == null) {
309+
if (this.offsetDeduplication) {
310+
throw new NoSuchElementException();
311+
} else {
312+
return new byte[0];
313+
}
314+
}
315+
return curId;
316+
}
317+
318+
@Override
319+
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
320+
if (curOffset == null) {
321+
if (this.offsetDeduplication) {
322+
throw new NoSuchElementException();
323+
} else {
324+
return new byte[0];
325+
}
326+
}
327+
return curOffset;
328+
}
329+
302330
@Override
303331
public long getSplitBacklogBytes() {
304332
long backlogBytes = 0;
@@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
314342
return backlogBytes;
315343
}
316344

345+
public boolean offsetDeduplication() {
346+
return offsetDeduplication;
347+
}
348+
317349
////////////////////////////////////////////////////////////////////////////////////////////////
318350

319351
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
336368
private @Nullable Instant curTimestamp = null;
337369
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();
338370

371+
private final boolean offsetDeduplication;
372+
private byte[] curOffset = new byte[0];
373+
private byte[] curId = new byte[0];
374+
339375
private @Nullable Deserializer<K> keyDeserializerInstance = null;
340376
private @Nullable Deserializer<V> valueDeserializerInstance = null;
341377

@@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
507543
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
508544
this.source = source;
509545
this.name = "Reader-" + source.getId();
546+
this.offsetDeduplication = source.offsetDeduplication();
510547

511548
List<TopicPartition> partitions =
512549
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ public boolean requiresDeduping() {
177177
return false;
178178
}
179179

180+
@Override
181+
public boolean offsetDeduplication() {
182+
return spec.isOffsetDeduplication();
183+
}
184+
180185
@Override
181186
public Coder<KafkaRecord<K, V>> getOutputCoder() {
182187
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());

sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
101101
.addBooleanField("redistribute")
102102
.addBooleanField("allows_duplicates")
103103
.addNullableInt32Field("redistribute_num_keys")
104+
.addBooleanField("offset_deduplication")
104105
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
105106
.addByteArrayField("timestamp_policy_factory")
106107
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
@@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
221222
fieldValues.put("redistribute", transform.isRedistributed());
222223
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
223224
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
225+
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
224226
return Row.withSchema(schema).withFieldValues(fieldValues).build();
225227
}
226228

@@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
349351
}
350352
}
351353
}
354+
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
355+
if (offsetDeduplication != null) {
356+
transform = transform.withOffsetDeduplication(offsetDeduplication);
357+
}
352358
Duration maxReadTime = configRow.getValue("max_read_time");
353359
if (maxReadTime != null) {
354360
transform =

0 commit comments

Comments
 (0)