Skip to content

Commit 4d87e04

Browse files
authored
Add deterministic redistribute sharding for KafkaIO read. (#36112)
* Add deterministic redistribute sharding for KafkaIO read. * Address PR feedback. * Provide more detailed transform name for the redistribute. * Address spotless precommit findings. * Address spotless precommit findings. * Keep redistribute transform name the same. * Add deterministic sharding unit test. * Refactor to specific deterministic Kafka redistribute method. * Add redistribute by key variant. * Add test of sharding fns. * Add bucketing to redistributeByKey and add option to redistribute by key. * Actually enable withRedistributeByRecordKey in KafkaIOTest. * Add byRecordKey property to Kafka read compatibility. * Fix comma formatting for mkKafkaReadTransform. * Fix cases where reader was not overwritten when building Kafka reader. * Rebase and revert method rename for debugging. * Address spotless finding for makeKafkaRecord. * Add tests for deterministic sharding. * numBuckets as UnsignedInteger to reduce conversion overhead, and clarify sharding Fn display name.
1 parent bb340c2 commit 4d87e04

File tree

9 files changed

+492
-28
lines changed

9 files changed

+492
-28
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void processElement(
131131
public static class RedistributeArbitrarily<T>
132132
extends PTransform<PCollection<T>, PCollection<T>> {
133133
// The number of buckets to shard into.
134-
// A runner is free to ignore this (a runner may ignore the transorm
134+
// A runner is free to ignore this (a runner may ignore the transform
135135
// entirely!) This is a performance optimization to prevent having
136136
// unit sized bundles on the output. If unset, uses a random integer key.
137137
private @Nullable Integer numBuckets = null;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.beam.sdk.transforms.PTransform;
8080
import org.apache.beam.sdk.transforms.ParDo;
8181
import org.apache.beam.sdk.transforms.Redistribute;
82+
import org.apache.beam.sdk.transforms.Redistribute.RedistributeArbitrarily;
8283
import org.apache.beam.sdk.transforms.Reshuffle;
8384
import org.apache.beam.sdk.transforms.SerializableFunction;
8485
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -730,6 +731,9 @@ public abstract static class Read<K, V>
730731
@Pure
731732
public abstract @Nullable Boolean getOffsetDeduplication();
732733

734+
@Pure
735+
public abstract @Nullable Boolean getRedistributeByRecordKey();
736+
733737
@Pure
734738
public abstract @Nullable Duration getWatchTopicPartitionDuration();
735739

@@ -800,6 +804,8 @@ abstract Builder<K, V> setConsumerFactoryFn(
800804

801805
abstract Builder<K, V> setOffsetDeduplication(Boolean offsetDeduplication);
802806

807+
abstract Builder<K, V> setRedistributeByRecordKey(Boolean redistributeByRecordKey);
808+
803809
abstract Builder<K, V> setTimestampPolicyFactory(
804810
TimestampPolicyFactory<K, V> timestampPolicyFactory);
805811

@@ -915,11 +921,15 @@ static <K, V> void setupExternalBuilder(
915921
&& config.offsetDeduplication != null) {
916922
builder.setOffsetDeduplication(config.offsetDeduplication);
917923
}
924+
if (config.redistribute && config.redistributeByRecordKey != null) {
925+
builder.setRedistributeByRecordKey(config.redistributeByRecordKey);
926+
}
918927
} else {
919928
builder.setRedistributed(false);
920929
builder.setRedistributeNumKeys(0);
921930
builder.setAllowDuplicates(false);
922931
builder.setOffsetDeduplication(false);
932+
builder.setRedistributeByRecordKey(false);
923933
}
924934
}
925935

@@ -989,6 +999,7 @@ public static class Configuration {
989999
private Boolean redistribute;
9901000
private Boolean allowDuplicates;
9911001
private Boolean offsetDeduplication;
1002+
private Boolean redistributeByRecordKey;
9921003
private Long dynamicReadPollIntervalSeconds;
9931004

9941005
public void setConsumerConfig(Map<String, String> consumerConfig) {
@@ -1051,6 +1062,10 @@ public void setOffsetDeduplication(Boolean offsetDeduplication) {
10511062
this.offsetDeduplication = offsetDeduplication;
10521063
}
10531064

1065+
public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) {
1066+
this.redistributeByRecordKey = redistributeByRecordKey;
1067+
}
1068+
10541069
public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) {
10551070
this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
10561071
}
@@ -1161,6 +1176,10 @@ public Read<K, V> withOffsetDeduplication(Boolean offsetDeduplication) {
11611176
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
11621177
}
11631178

1179+
public Read<K, V> withRedistributeByRecordKey(Boolean redistributeByRecordKey) {
1180+
return toBuilder().setRedistributeByRecordKey(redistributeByRecordKey).build();
1181+
}
1182+
11641183
/**
11651184
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
11661185
* from each of the matching topics are read.
@@ -1679,6 +1698,11 @@ private void checkRedistributeConfiguration() {
16791698
LOG.warn(
16801699
"Offsets used for deduplication are available in WindowedValue's metadata. Combining, aggregating, mutating them may risk with data loss.");
16811700
}
1701+
if (getRedistributeByRecordKey() != null && getRedistributeByRecordKey()) {
1702+
checkState(
1703+
isRedistributed(),
1704+
"withRedistributeByRecordKey can only be used when withRedistribute is set.");
1705+
}
16821706
}
16831707

16841708
private void warnAboutUnsafeConfigurations(PBegin input) {
@@ -1858,18 +1882,25 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
18581882
"Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled");
18591883
}
18601884

1861-
if (kafkaRead.getRedistributeNumKeys() == 0) {
1862-
return output.apply(
1863-
"Insert Redistribute",
1864-
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1865-
.withAllowDuplicates(kafkaRead.isAllowDuplicates()));
1866-
} else {
1867-
return output.apply(
1868-
"Insert Redistribute with Shards",
1869-
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1870-
.withAllowDuplicates(kafkaRead.isAllowDuplicates())
1871-
.withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
1885+
if (kafkaRead.getOffsetDeduplication() != null && kafkaRead.getOffsetDeduplication()) {
1886+
if (kafkaRead.getRedistributeByRecordKey() != null
1887+
&& kafkaRead.getRedistributeByRecordKey()) {
1888+
return output.apply(
1889+
KafkaReadRedistribute.<K, V>byRecordKey(kafkaRead.getRedistributeNumKeys()));
1890+
} else {
1891+
return output.apply(
1892+
KafkaReadRedistribute.<K, V>byOffsetShard(kafkaRead.getRedistributeNumKeys()));
1893+
}
1894+
}
1895+
RedistributeArbitrarily<KafkaRecord<K, V>> redistribute =
1896+
Redistribute.<KafkaRecord<K, V>>arbitrarily()
1897+
.withAllowDuplicates(kafkaRead.isAllowDuplicates());
1898+
String redistributeName = "Insert Redistribute";
1899+
if (kafkaRead.getRedistributeNumKeys() != 0) {
1900+
redistribute = redistribute.withNumBuckets((int) kafkaRead.getRedistributeNumKeys());
1901+
redistributeName = "Insert Redistribute with Shards";
18721902
}
1903+
return output.apply(redistributeName, redistribute);
18731904
}
18741905
return output;
18751906
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ Object getDefaultValue() {
139139
},
140140
OFFSET_DEDUPLICATION(LEGACY),
141141
LOG_TOPIC_VERIFICATION,
142+
REDISTRIBUTE_BY_RECORD_KEY {
143+
@Override
144+
Object getDefaultValue() {
145+
return false;
146+
}
147+
},
142148
;
143149

144150
private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.kafka;
19+
20+
import static java.nio.charset.StandardCharsets.UTF_8;
21+
22+
import org.apache.beam.sdk.transforms.DoFn;
23+
import org.apache.beam.sdk.transforms.PTransform;
24+
import org.apache.beam.sdk.transforms.ParDo;
25+
import org.apache.beam.sdk.transforms.Redistribute;
26+
import org.apache.beam.sdk.transforms.Values;
27+
import org.apache.beam.sdk.values.KV;
28+
import org.apache.beam.sdk.values.PCollection;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
31+
import org.checkerframework.checker.nullness.qual.NonNull;
32+
import org.checkerframework.checker.nullness.qual.Nullable;
33+
34+
public class KafkaReadRedistribute<K, V>
35+
extends PTransform<PCollection<KafkaRecord<K, V>>, PCollection<KafkaRecord<K, V>>> {
36+
public static <K, V> KafkaReadRedistribute<K, V> byOffsetShard(@Nullable Integer numBuckets) {
37+
return new KafkaReadRedistribute<>(numBuckets, false);
38+
}
39+
40+
public static <K, V> KafkaReadRedistribute<K, V> byRecordKey(@Nullable Integer numBuckets) {
41+
return new KafkaReadRedistribute<>(numBuckets, true);
42+
}
43+
44+
// The number of buckets to shard into.
45+
private @Nullable Integer numBuckets = null;
46+
// When redistributing, group records by the Kafka record's key instead of by offset hash.
47+
private boolean byRecordKey = false;
48+
49+
private KafkaReadRedistribute(@Nullable Integer numBuckets, boolean byRecordKey) {
50+
this.numBuckets = numBuckets;
51+
this.byRecordKey = byRecordKey;
52+
}
53+
54+
@Override
55+
public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaRecord<K, V>> input) {
56+
57+
if (byRecordKey) {
58+
return input
59+
.apply("Pair with shard from key", ParDo.of(new AssignRecordKeyFn<K, V>(numBuckets)))
60+
.apply(Redistribute.<Integer, KafkaRecord<K, V>>byKey().withAllowDuplicates(false))
61+
.apply(Values.create());
62+
}
63+
64+
return input
65+
.apply("Pair with shard from offset", ParDo.of(new AssignOffsetShardFn<K, V>(numBuckets)))
66+
.apply(Redistribute.<Integer, KafkaRecord<K, V>>byKey().withAllowDuplicates(false))
67+
.apply(Values.create());
68+
}
69+
70+
static class AssignOffsetShardFn<K, V>
71+
extends DoFn<KafkaRecord<K, V>, KV<Integer, KafkaRecord<K, V>>> {
72+
private @NonNull UnsignedInteger numBuckets;
73+
74+
public AssignOffsetShardFn(@Nullable Integer numBuckets) {
75+
if (numBuckets != null && numBuckets > 0) {
76+
this.numBuckets = UnsignedInteger.fromIntBits(numBuckets);
77+
} else {
78+
this.numBuckets = UnsignedInteger.valueOf(0);
79+
}
80+
}
81+
82+
@ProcessElement
83+
public void processElement(
84+
@Element KafkaRecord<K, V> element,
85+
OutputReceiver<KV<Integer, KafkaRecord<K, V>>> receiver) {
86+
int hash = Hashing.farmHashFingerprint64().hashLong(element.getOffset()).asInt();
87+
88+
if (numBuckets != null) {
89+
hash = UnsignedInteger.fromIntBits(hash).mod(numBuckets).intValue();
90+
}
91+
92+
receiver.output(KV.of(hash, element));
93+
}
94+
}
95+
96+
static class AssignRecordKeyFn<K, V>
97+
extends DoFn<KafkaRecord<K, V>, KV<Integer, KafkaRecord<K, V>>> {
98+
99+
private @NonNull UnsignedInteger numBuckets;
100+
101+
public AssignRecordKeyFn(@Nullable Integer numBuckets) {
102+
if (numBuckets != null && numBuckets > 0) {
103+
this.numBuckets = UnsignedInteger.fromIntBits(numBuckets);
104+
} else {
105+
this.numBuckets = UnsignedInteger.valueOf(0);
106+
}
107+
}
108+
109+
@ProcessElement
110+
public void processElement(
111+
@Element KafkaRecord<K, V> element,
112+
OutputReceiver<KV<Integer, KafkaRecord<K, V>>> receiver) {
113+
K key = element.getKV().getKey();
114+
String keyString = key == null ? "" : key.toString();
115+
int hash = Hashing.farmHashFingerprint64().hashBytes(keyString.getBytes(UTF_8)).asInt();
116+
117+
if (numBuckets != null) {
118+
hash = UnsignedInteger.fromIntBits(hash).mod(numBuckets).intValue();
119+
}
120+
121+
receiver.output(KV.of(hash, element));
122+
}
123+
}
124+
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.io.kafka;
1919

20-
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransform;
2120
import static org.apache.beam.sdk.io.kafka.KafkaIOTest.mkKafkaReadTransformWithOffsetDedup;
2221
import static org.hamcrest.MatcherAssert.assertThat;
2322
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -109,15 +108,16 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie
109108
Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>> kafkaReadDecorator) {
110109
p.apply(
111110
kafkaReadDecorator.apply(
112-
mkKafkaReadTransform(
111+
KafkaIOTest.mkKafkaReadTransform(
113112
1000,
114113
null,
115114
new ValueAsTimestampFn(),
116115
false, /*redistribute*/
117116
false, /*allowDuplicates*/
118117
0, /*numKeys*/
119118
null, /*offsetDeduplication*/
120-
null /*topics*/)));
119+
null, /*topics*/
120+
null /*redistributeByRecordKey*/)));
121121
return p.run();
122122
}
123123

0 commit comments

Comments
 (0)