Skip to content

Commit 575df4e

Browse files
authored
Set default redistribute key limit for KafkaIO read. (#36124)
1 parent bdf1be3 commit 575df4e

File tree

2 files changed

+83
-5
lines changed

2 files changed

+83
-5
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,14 @@ public static <K, V> WriteRecords<K, V> writeRecords() {
655655

656656
///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
657657

658+
/**
659+
* Default number of keys to redistribute Kafka inputs into.
660+
*
661+
* <p>This value is used when {@link Read#withRedistribute()} is used without {@link
662+
* Read#withRedistributeNumKeys(int redistributeNumKeys)}.
663+
*/
664+
private static final int DEFAULT_REDISTRIBUTE_NUM_KEYS = 32768;
665+
658666
/**
659667
* A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
660668
* usage and configuration.
@@ -1099,7 +1107,11 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
10991107
* @return an updated {@link Read} transform.
11001108
*/
11011109
public Read<K, V> withRedistribute() {
1102-
return toBuilder().setRedistributed(true).build();
1110+
Builder<K, V> builder = toBuilder().setRedistributed(true);
1111+
if (getRedistributeNumKeys() == 0) {
1112+
builder = builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
1113+
}
1114+
return builder.build();
11031115
}
11041116

11051117
/**
@@ -1121,10 +1133,11 @@ public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
11211133
* Redistributes Kafka messages into a distinct number of keys for processing in subsequent
11221134
* steps.
11231135
*
1124-
* <p>Specifying an explicit number of keys is generally recommended over redistributing into an
1125-
* unbounded key space.
1136+
* <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
11261137
*
1127-
* <p>Must be used with {@link KafkaIO#withRedistribute()}.
1138+
* <p>Use zero to disable bucketing into a distinct number of keys.
1139+
*
1140+
* <p>Must be used with {@link Read#withRedistribute()}.
11281141
*
11291142
* @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
11301143
* @return an updated {@link Read} transform.
@@ -2667,13 +2680,30 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
26672680

26682681
/** Enable Redistribute. */
26692682
public ReadSourceDescriptors<K, V> withRedistribute() {
2670-
return toBuilder().setRedistribute(true).build();
2683+
Builder<K, V> builder = toBuilder().setRedistribute(true);
2684+
if (getRedistributeNumKeys() == 0) {
2685+
builder = builder.setRedistributeNumKeys(DEFAULT_REDISTRIBUTE_NUM_KEYS);
2686+
}
2687+
return builder.build();
26712688
}
26722689

26732690
public ReadSourceDescriptors<K, V> withAllowDuplicates() {
26742691
return toBuilder().setAllowDuplicates(true).build();
26752692
}
26762693

2694+
/**
2695+
* Redistributes Kafka messages into a distinct number of keys for processing in subsequent
2696+
* steps.
2697+
*
2698+
* <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
2699+
*
2700+
* <p>Use zero to disable bucketing into a distinct number of keys.
2701+
*
2702+
* <p>Must be used with {@link ReadSourceDescriptors#withRedistribute()}.
2703+
*
2704+
* @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
2705+
* @return an updated {@link Read} transform.
2706+
*/
26772707
public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
26782708
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
26792709
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.hamcrest.Matchers.matchesPattern;
3131
import static org.hamcrest.Matchers.not;
3232
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertFalse;
3334
import static org.junit.Assert.assertNotNull;
3435
import static org.junit.Assert.assertNull;
3536
import static org.junit.Assert.assertTrue;
@@ -792,6 +793,53 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
792793
p.run();
793794
}
794795

796+
@Test
797+
public void testDefaultRedistributeNumKeys() {
798+
int numElements = 1000;
799+
// Redistribute is not used and does not modify the read transform further.
800+
KafkaIO.Read<Integer, Long> read =
801+
mkKafkaReadTransform(
802+
numElements,
803+
numElements,
804+
new ValueAsTimestampFn(),
805+
false, /*redistribute*/
806+
false, /*allowDuplicates*/
807+
null, /*numKeys*/
808+
null, /*offsetDeduplication*/
809+
null /*topics*/);
810+
assertFalse(read.isRedistributed());
811+
assertEquals(0, read.getRedistributeNumKeys());
812+
813+
// Redistribute is used and defaulted the number of keys due to no user setting.
814+
read =
815+
mkKafkaReadTransform(
816+
numElements,
817+
numElements,
818+
new ValueAsTimestampFn(),
819+
true, /*redistribute*/
820+
false, /*allowDuplicates*/
821+
null, /*numKeys*/
822+
null, /*offsetDeduplication*/
823+
null /*topics*/);
824+
assertTrue(read.isRedistributed());
825+
// Default is defined by DEFAULT_REDISTRIBUTE_NUM_KEYS in KafkaIO.
826+
assertEquals(32768, read.getRedistributeNumKeys());
827+
828+
// Redistribute is set with user-specified the number of keys.
829+
read =
830+
mkKafkaReadTransform(
831+
numElements,
832+
numElements,
833+
new ValueAsTimestampFn(),
834+
true, /*redistribute*/
835+
false, /*allowDuplicates*/
836+
10, /*numKeys*/
837+
null, /*offsetDeduplication*/
838+
null /*topics*/);
839+
assertTrue(read.isRedistributed());
840+
assertEquals(10, read.getRedistributeNumKeys());
841+
}
842+
795843
@Test
796844
public void testDisableRedistributeKafkaOffsetLegacy() {
797845
thrown.expect(Exception.class);

0 commit comments

Comments
 (0)