Skip to content

Commit 6043660

Browse files
authored
Add schema provider support for Kafka redistribute options (#36332)
* Add deterministic sharding unit test. * Refactor to specific deterministic Kafka redistribute method. * Add redistribute by key variant. * Actually enable withRedistributeByRecordKey in KafkaIOTest. * Add byRecordKey property to Kafka read compatibility. * Rebase and revert method rename for debugging. * Add schema provider for redistribute options * Address spotless findings to simplify boolean expressions * Revert accidental changes from merge conflict resolution * Refactor into helper method.
1 parent 7174991 commit 6043660

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,26 @@ public static Builder builder() {
160160
@Nullable
161161
public abstract ErrorHandling getErrorHandling();
162162

163+
@SchemaFieldDescription("If the Kafka read should be redistributed.")
164+
@Nullable
165+
public abstract Boolean getRedistributed();
166+
167+
@SchemaFieldDescription("If the Kafka read allows duplicates.")
168+
@Nullable
169+
public abstract Boolean getAllowDuplicates();
170+
171+
@SchemaFieldDescription("The number of keys for redistributing Kafka inputs.")
172+
@Nullable
173+
public abstract Integer getRedistributeNumKeys();
174+
175+
@SchemaFieldDescription("If the redistribute is using offset deduplication mode.")
176+
@Nullable
177+
public abstract Boolean getOffsetDeduplication();
178+
179+
@SchemaFieldDescription("If the redistribute keys by the Kafka record key.")
180+
@Nullable
181+
public abstract Boolean getRedistributeByRecordKey();
182+
163183
/** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
164184
@AutoValue.Builder
165185
public abstract static class Builder {
@@ -190,6 +210,16 @@ public abstract static class Builder {
190210

191211
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
192212

213+
public abstract Builder setRedistributed(Boolean redistribute);
214+
215+
public abstract Builder setAllowDuplicates(Boolean allowDuplicates);
216+
217+
public abstract Builder setRedistributeNumKeys(Integer redistributeNumKeys);
218+
219+
public abstract Builder setOffsetDeduplication(Boolean offsetDeduplication);
220+
221+
public abstract Builder setRedistributeByRecordKey(Boolean redistributeByRecordKey);
222+
193223
/** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
194224
public abstract KafkaReadSchemaTransformConfiguration build();
195225
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,31 @@ private SchemaRegistryProvider getSchemaRegistryProvider(String confluentSchemaR
166166
return SchemaRegistryProvider.UNSPECIFIED;
167167
}
168168

169+
private static <K, V> KafkaIO.Read<K, V> applyRedistributeSettings(
170+
KafkaIO.Read<K, V> kafkaRead, KafkaReadSchemaTransformConfiguration configuration) {
171+
Boolean redistribute = configuration.getRedistributed();
172+
if (redistribute != null && redistribute) {
173+
kafkaRead = kafkaRead.withRedistribute();
174+
}
175+
Integer redistributeNumKeys = configuration.getRedistributeNumKeys();
176+
if (redistributeNumKeys != null && redistributeNumKeys > 0) {
177+
kafkaRead = kafkaRead.withRedistributeNumKeys(redistributeNumKeys);
178+
}
179+
Boolean allowDuplicates = configuration.getAllowDuplicates();
180+
if (allowDuplicates != null) {
181+
kafkaRead = kafkaRead.withAllowDuplicates(allowDuplicates);
182+
}
183+
Boolean redistributeByRecordKey = configuration.getRedistributeByRecordKey();
184+
if (redistributeByRecordKey != null) {
185+
kafkaRead = kafkaRead.withRedistributeByRecordKey(redistributeByRecordKey);
186+
}
187+
Boolean offsetDeduplication = configuration.getOffsetDeduplication();
188+
if (offsetDeduplication != null) {
189+
kafkaRead = kafkaRead.withOffsetDeduplication(offsetDeduplication);
190+
}
191+
return kafkaRead;
192+
}
193+
169194
@Override
170195
public PCollectionRowTuple expand(PCollectionRowTuple input) {
171196
configuration.validate();
@@ -233,6 +258,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
233258
kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
234259
}
235260

261+
kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
262+
236263
PCollection<GenericRecord> kafkaValues =
237264
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
238265

@@ -283,6 +310,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
283310
kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
284311
}
285312

313+
kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
314+
286315
PCollection<byte[]> kafkaValues =
287316
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
288317

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ public void testFindTransformAndMakeItWork() {
130130
"error_handling",
131131
"file_descriptor_path",
132132
"message_name",
133-
"max_read_time_seconds"),
133+
"max_read_time_seconds",
134+
"redistributed",
135+
"allow_duplicates",
136+
"offset_deduplication",
137+
"redistribute_num_keys",
138+
"redistribute_by_record_key"),
134139
kafkaProvider.configurationSchema().getFields().stream()
135140
.map(field -> field.getName())
136141
.collect(Collectors.toSet()));

0 commit comments

Comments
 (0)