Skip to content

Commit 2b4056b

Browse files
authored
Merge pull request #35214: Allow STRING format in Kafka config
2 parents c786cd0 + 5572ad8 commit 2b4056b

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public void validate() {
7171
} else if (dataFormat != null && dataFormat.equals("RAW")) {
7272
checkArgument(
7373
inputSchema == null, "To read from Kafka in RAW format, you can't provide a schema.");
74+
} else if (dataFormat != null && dataFormat.equals("STRING")) {
75+
checkArgument(
76+
inputSchema == null, "To read from Kafka in STRING format, you can't provide a schema.");
7477
} else if (dataFormat != null && dataFormat.equals("JSON")) {
7578
checkNotNull(inputSchema, "To read from Kafka in JSON format, you must provide a schema.");
7679
} else if (dataFormat != null && dataFormat.equals("PROTO")) {

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,24 @@ public void testBuildTransformWithRawFormat() {
197197
.build());
198198
}
199199

200+
@Test
201+
public void testBuildTransformWithStringFormat() {
202+
ServiceLoader<SchemaTransformProvider> serviceLoader =
203+
ServiceLoader.load(SchemaTransformProvider.class);
204+
List<SchemaTransformProvider> providers =
205+
StreamSupport.stream(serviceLoader.spliterator(), false)
206+
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
207+
.collect(Collectors.toList());
208+
KafkaReadSchemaTransformProvider kafkaProvider =
209+
(KafkaReadSchemaTransformProvider) providers.get(0);
210+
kafkaProvider.from(
211+
KafkaReadSchemaTransformConfiguration.builder()
212+
.setTopic("anytopic")
213+
.setBootstrapServers("anybootstrap")
214+
.setFormat("STRING")
215+
.build());
216+
}
217+
200218
@Test
201219
public void testBuildTransformWithProtoFormat() {
202220
ServiceLoader<SchemaTransformProvider> serviceLoader =
@@ -300,15 +318,16 @@ public void testBuildTransformWithManaged() {
300318
List<String> configs =
301319
Arrays.asList(
302320
"topic: topic_1\n" + "bootstrap_servers: some bootstrap\n" + "format: RAW",
303-
"topic: topic_2\n"
321+
"topic: topic_2\n" + "bootstrap_servers: some bootstrap\n" + "format: STRING",
322+
"topic: topic_3\n"
304323
+ "bootstrap_servers: some bootstrap\n"
305324
+ "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'",
306-
"topic: topic_3\n"
325+
"topic: topic_4\n"
307326
+ "bootstrap_servers: some bootstrap\n"
308327
+ "schema_registry_url: some-url\n"
309328
+ "schema_registry_subject: some-subject\n"
310329
+ "format: RAW",
311-
"topic: topic_4\n"
330+
"topic: topic_5\n"
312331
+ "bootstrap_servers: some bootstrap\n"
313332
+ "format: PROTO\n"
314333
+ "schema: '"

0 commit comments

Comments
 (0)