Skip to content

Commit e13155a

Browse files
VeronicaWassonriathakkar
authored andcommitted
docs(samples): Add Dataflow snippet to read from multiple Kafka topics (GoogleCloudPlatform#12530)
1 parent b4e9f56 commit e13155a

File tree

3 files changed

+132
-37
lines changed

3 files changed

+132
-37
lines changed

dataflow/snippets/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ RUN apt-get update \
3838

3939

4040
COPY read_kafka.py ./
41+
COPY read_kafka_multi_topic.py ./
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#!/usr/bin/env python
2+
# Copyright 2024 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# [START dataflow_kafka_read_multi_topic]
17+
import argparse
18+
19+
import apache_beam as beam
20+
21+
from apache_beam.io.kafka import ReadFromKafka
22+
from apache_beam.io.textio import WriteToText
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
25+
26+
def read_from_kafka() -> None:
27+
# Parse the pipeline options passed into the application. Example:
28+
# --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
29+
# For more information, see
30+
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
31+
class MyOptions(PipelineOptions):
32+
@staticmethod
33+
def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
34+
parser.add_argument('--bootstrap_server')
35+
parser.add_argument('--output')
36+
37+
options = MyOptions()
38+
with beam.Pipeline(options=options) as pipeline:
39+
# Read from two Kafka topics.
40+
all_topics = pipeline | ReadFromKafka(consumer_config={
41+
"bootstrap.servers": options.bootstrap_server
42+
},
43+
topics=["topic1", "topic2"],
44+
with_metadata=True,
45+
max_num_records=10,
46+
start_read_time=0
47+
)
48+
49+
# Filter messages from one topic into one branch of the pipeline.
50+
(all_topics
51+
| beam.Filter(lambda message: message.topic == 'topic1')
52+
| beam.Map(lambda message: message.value.decode('utf-8'))
53+
| "Write topic1" >> WriteToText(
54+
file_path_prefix=options.output + '/topic1/output',
55+
file_name_suffix='.txt',
56+
num_shards=1))
57+
58+
# Filter messages from the other topic.
59+
(all_topics
60+
| beam.Filter(lambda message: message.topic == 'topic2')
61+
| beam.Map(lambda message: message.value.decode('utf-8'))
62+
| "Write topic2" >> WriteToText(
63+
file_path_prefix=options.output + '/topic2/output',
64+
file_name_suffix='.txt',
65+
num_shards=1))
66+
# [END dataflow_kafka_read_multi_topic]
67+
68+
69+
if __name__ == "__main__":
70+
read_from_kafka()

dataflow/snippets/tests/test_read_kafka.py

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,76 +18,100 @@
1818

1919
import docker
2020

21+
from docker import DockerClient
2122
from kafka import KafkaProducer
2223
from kafka.admin import KafkaAdminClient, NewTopic
2324
from kafka.errors import NoBrokersAvailable
2425

2526
import pytest
2627

2728

28-
BOOTSTRAP_SERVER = "localhost:9092"
29-
TOPIC_NAME = f"topic-{uuid.uuid4()}"
30-
CONTAINER_IMAGE_NAME = "kafka-pipeline:1"
29+
BOOTSTRAP_SERVER = 'localhost:9092'
30+
TOPIC_NAMES = ['topic1', 'topic2']
31+
CONTAINER_IMAGE_NAME = 'kafka-pipeline:1'
3132

3233

33-
@pytest.fixture(scope="module", autouse=True)
34-
def kafka_container() -> None:
34+
@pytest.fixture(scope='module')
35+
def docker_client() -> DockerClient:
36+
# Build a container image for the pipeline.
37+
client = docker.from_env()
38+
client.images.build(path='./', tag=CONTAINER_IMAGE_NAME)
39+
yield client
40+
41+
42+
@pytest.fixture(scope='module', autouse=True)
43+
def kafka_container(docker_client: DockerClient) -> None:
3544
# Start a containerized Kafka server.
36-
docker_client = docker.from_env()
37-
container = docker_client.containers.run(
38-
"apache/kafka:3.7.0", network_mode="host", detach=True
39-
)
45+
container = docker_client.containers.run('apache/kafka:3.7.0', network_mode='host', detach=True)
4046
try:
41-
create_topic()
47+
create_topics()
48+
send_messages(TOPIC_NAMES[0])
49+
send_messages(TOPIC_NAMES[1])
4250
yield
4351
finally:
4452
container.stop()
4553

4654

47-
def create_topic() -> None:
48-
# Try to create a Kafka topic. We might need to wait for the Kafka service to start.
55+
@pytest.fixture
56+
def file_name_prefix() -> str:
57+
return f'output-{uuid.uuid4()}'
58+
59+
60+
def create_topics() -> None:
61+
# Try to create Kafka topics. We might need to wait for the Kafka service to start.
4962
for _ in range(1, 10):
5063
try:
5164
client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
5265
topics = []
53-
topics.append(
54-
NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1)
55-
)
66+
topics.append(NewTopic(name=TOPIC_NAMES[0], num_partitions=1, replication_factor=1))
67+
topics.append(NewTopic(name=TOPIC_NAMES[1], num_partitions=1, replication_factor=1))
5668
client.create_topics(topics)
5769
break
5870
except NoBrokersAvailable:
5971
time.sleep(5)
6072

6173

62-
def test_read_from_kafka(tmp_path: Path) -> None:
63-
file_name_prefix = f"output-{uuid.uuid4()}"
64-
file_name = f"{tmp_path}/{file_name_prefix}-00000-of-00001.txt"
65-
74+
def send_messages(topic: str) -> None:
6675
# Send some messages to Kafka
6776
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER)
6877
for i in range(0, 5):
69-
message = f"event-{i}"
70-
producer.send(TOPIC_NAME, message.encode())
71-
72-
# Build a container image for the pipeline.
73-
client = docker.from_env()
74-
client.images.build(path="./", tag=CONTAINER_IMAGE_NAME)
78+
message = f'{topic}-{i}'
79+
producer.send(topic, message.encode())
7580

76-
# Run the pipeline.
77-
client.containers.run(
78-
image=CONTAINER_IMAGE_NAME,
79-
command=f"/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}",
80-
volumes=["/var/run/docker.sock:/var/run/docker.sock", f"{tmp_path}/:/out"],
81-
network_mode="host",
82-
entrypoint="python",
83-
)
8481

82+
def verify_output(file_name: str, topic: str) -> None:
8583
# Verify the pipeline wrote the Kafka messages to the output file.
86-
with open(file_name, "r") as f:
84+
with open(file_name, 'r') as f:
8785
text = f.read()
8886
for i in range(0, 5):
89-
assert f"event-{i}" in text
87+
assert f'{topic}-{i}' in text
88+
9089

90+
def test_read_kafka(docker_client: DockerClient, tmp_path: Path, file_name_prefix: str) -> None:
91+
topic = TOPIC_NAMES[0]
92+
93+
# Run the containerized Dataflow pipeline.
94+
docker_client.containers.run(
95+
image=CONTAINER_IMAGE_NAME,
96+
command=f'/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {topic}',
97+
volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'],
98+
network_mode='host',
99+
entrypoint='python')
91100

92-
if __name__ == "__main__":
93-
test_read_from_kafka()
101+
# Verify the pipeline wrote the Kafka messages to the output file.
102+
verify_output(f'{tmp_path}/{file_name_prefix}-00000-of-00001.txt', topic)
103+
104+
105+
def test_read_kafka_multi_topic(docker_client: DockerClient, tmp_path: Path, file_name_prefix: str) -> None:
106+
# Run the containerized Dataflow pipeline.
107+
docker_client.containers.run(
108+
image=CONTAINER_IMAGE_NAME,
109+
command=f'/pipeline/read_kafka_multi_topic.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER}',
110+
volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'],
111+
network_mode='host',
112+
entrypoint='python')
113+
114+
# Verify the pipeline wrote the Kafka messages to the output files.
115+
# This code snippet writes outputs to separate directories based on the topic name.
116+
for topic in TOPIC_NAMES:
117+
verify_output(f'{tmp_path}/{file_name_prefix}/{topic}/output-00000-of-00001.txt', topic)

0 commit comments

Comments
 (0)