Skip to content

Commit f89f6fa

Browse files
docs(samples): Read multiple Kafka topics from Dataflow (#9511)
1 parent 4d10950 commit f89f6fa

File tree

3 files changed

+154
-20
lines changed

3 files changed

+154
-20
lines changed

dataflow/snippets/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,10 @@
163163
<version>${apache_beam.version}</version>
164164
</dependency>
165165
<dependency>
166-
<!-- For sending Kafka messages in the integration test -->
166+
<!-- For initializing the KafkaIO connector -->
167167
<groupId>org.apache.kafka</groupId>
168168
<artifactId>kafka-clients</artifactId>
169169
<version>3.8.0</version>
170-
<scope>test</scope>
171170
</dependency>
172171
<dependency>
173172
<!-- For running containerized Kafka instance in the integration test -->
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_kafka_read_multi_topic]
20+
import java.util.List;
21+
import org.apache.beam.sdk.Pipeline;
22+
import org.apache.beam.sdk.PipelineResult;
23+
import org.apache.beam.sdk.io.TextIO;
24+
import org.apache.beam.sdk.io.kafka.KafkaIO;
25+
import org.apache.beam.sdk.options.Description;
26+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
27+
import org.apache.beam.sdk.options.StreamingOptions;
28+
import org.apache.beam.sdk.transforms.Filter;
29+
import org.apache.beam.sdk.transforms.MapElements;
30+
import org.apache.beam.sdk.values.TypeDescriptors;
31+
import org.apache.kafka.common.serialization.LongDeserializer;
32+
import org.apache.kafka.common.serialization.StringDeserializer;
33+
import org.joda.time.Duration;
34+
import org.joda.time.Instant;
35+
36+
public class KafkaReadTopics {
37+
38+
// [END dataflow_kafka_read_multi_topic]
39+
public interface Options extends StreamingOptions {
40+
@Description("The Kafka bootstrap server. Example: localhost:9092")
41+
String getBootstrapServer();
42+
43+
void setBootstrapServer(String value);
44+
45+
@Description("The first Kafka topic to read from.")
46+
String getTopic1();
47+
48+
void setTopic1(String value);
49+
50+
@Description("The second Kafka topic to read from.")
51+
String getTopic2();
52+
53+
void setTopic2(String value);
54+
}
55+
56+
public static PipelineResult.State main(String[] args) {
57+
// Parse the pipeline options passed into the application. Example:
58+
// --bootstrap_servers=$BOOTSTRAP_SERVERS --topic=$KAFKA_TOPIC --outputPath=$OUTPUT_FILE
59+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
60+
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
61+
options.setStreaming(true);
62+
63+
Pipeline pipeline = createPipeline(options);
64+
return pipeline.run().waitUntilFinish();
65+
}
66+
67+
// [START dataflow_kafka_read_multi_topic]
68+
public static Pipeline createPipeline(Options options) {
69+
String topic1 = options.getTopic1();
70+
String topic2 = options.getTopic2();
71+
72+
// Build the pipeline.
73+
var pipeline = Pipeline.create(options);
74+
var allTopics = pipeline
75+
.apply(KafkaIO.<Long, String>read()
76+
.withTopics(List.of(topic1, topic2))
77+
.withBootstrapServers(options.getBootstrapServer())
78+
.withKeyDeserializer(LongDeserializer.class)
79+
.withValueDeserializer(StringDeserializer.class)
80+
.withMaxReadTime(Duration.standardSeconds(10))
81+
.withStartReadTime(Instant.EPOCH)
82+
);
83+
84+
// Create separate pipeline branches for each topic.
85+
// The first branch filters on topic1.
86+
allTopics
87+
.apply(Filter.by(record -> record.getTopic().equals(topic1)))
88+
.apply(MapElements
89+
.into(TypeDescriptors.strings())
90+
.via(record -> record.getKV().getValue()))
91+
.apply(TextIO.write()
92+
.to(topic1)
93+
.withSuffix(".txt")
94+
.withNumShards(1)
95+
);
96+
97+
// The second branch filters on topic2.
98+
allTopics
99+
.apply(Filter.by(record -> record.getTopic().equals(topic2)))
100+
.apply(MapElements
101+
.into(TypeDescriptors.strings())
102+
.via(record -> record.getKV().getValue()))
103+
.apply(TextIO.write()
104+
.to(topic2)
105+
.withSuffix(".txt")
106+
.withNumShards(1)
107+
);
108+
return pipeline;
109+
}
110+
}
111+
// [END dataflow_kafka_read_multi_topic]

dataflow/snippets/src/test/java/com/example/dataflow/KafkaReadIT.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@
3939
import org.testcontainers.utility.DockerImageName;
4040

4141
public class KafkaReadIT {
42-
private static final String TOPIC_NAME = "topic-" + UUID.randomUUID();
42+
private static final String[] TOPIC_NAMES = {
43+
"topic-" + UUID.randomUUID(),
44+
"topic-" + UUID.randomUUID()
45+
};
4346

44-
private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString();
45-
private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt";
47+
// The TextIO connector appends this suffix to the pipeline output file.
48+
private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt";
4649

4750
private static KafkaContainer kafka;
4851
private static String bootstrapServer;
@@ -54,40 +57,61 @@ public void setUp() throws ExecutionException, InterruptedException {
5457
kafka.start();
5558
bootstrapServer = kafka.getBootstrapServers();
5659

57-
// Create a topic.
60+
// Create topics.
5861
Properties properties = new Properties();
5962
properties.put("bootstrap.servers", bootstrapServer);
6063
AdminClient adminClient = AdminClient.create(properties);
61-
var topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
62-
adminClient.createTopics(Arrays.asList(topic));
64+
for (String topicName : TOPIC_NAMES) {
65+
var topic = new NewTopic(topicName, 1, (short) 1);
66+
adminClient.createTopics(Arrays.asList(topic));
67+
}
6368

64-
// Send a message to the topic.
65-
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
69+
// Send messages to the topics.
70+
properties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
6671
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
67-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
68-
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-0", "event-0");
69-
Future future = producer.send(record);
70-
future.get();
72+
KafkaProducer<Long, String> producer = new KafkaProducer<>(properties);
73+
for (String topicName : TOPIC_NAMES) {
74+
var record = new ProducerRecord<>(topicName, 0L, topicName + "-event-0");
75+
Future future = producer.send(record);
76+
future.get();
77+
}
7178
}
7279

7380
@After
7481
public void tearDown() throws IOException {
7582
kafka.stop();
76-
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
83+
for (String topicName : TOPIC_NAMES) {
84+
Files.deleteIfExists(Paths.get(topicName + OUTPUT_FILE_SUFFIX));
85+
}
7786
}
7887

7988
@Test
8089
public void testApacheKafkaRead() throws IOException {
8190
PipelineResult.State state = KafkaRead.main(new String[] {
8291
"--runner=DirectRunner",
8392
"--bootstrapServer=" + bootstrapServer,
84-
"--topic=" + TOPIC_NAME,
85-
"--outputPath=" + OUTPUT_FILE_NAME_PREFIX
93+
"--topic=" + TOPIC_NAMES[0],
94+
"--outputPath=" + TOPIC_NAMES[0] // Use the topic name as the output file name.
8695
});
8796
assertEquals(PipelineResult.State.DONE, state);
97+
verifyOutput(TOPIC_NAMES[0]);
98+
}
99+
100+
@Test
101+
public void testApacheKafkaReadTopics() throws IOException {
102+
PipelineResult.State state = KafkaReadTopics.main(new String[] {
103+
"--runner=DirectRunner",
104+
"--bootstrapServer=" + bootstrapServer,
105+
"--topic1=" + TOPIC_NAMES[0],
106+
"--topic2=" + TOPIC_NAMES[1]
107+
});
108+
assertEquals(PipelineResult.State.DONE, state);
109+
verifyOutput(TOPIC_NAMES[0]);
110+
verifyOutput(TOPIC_NAMES[1]);
111+
}
88112

89-
// Verify the pipeline wrote the output.
90-
String output = Files.readString(Paths.get(OUTPUT_FILE_NAME));
91-
assertTrue(output.contains("event-0"));
113+
private void verifyOutput(String topic) throws IOException {
114+
String output = Files.readString(Paths.get(topic + OUTPUT_FILE_SUFFIX));
115+
assertTrue(output.contains(topic + "-event-0"));
92116
}
93117
}

0 commit comments

Comments
 (0)