|
18 | 18 |
|
19 | 19 | import docker
|
20 | 20 |
|
| 21 | +from docker import DockerClient |
21 | 22 | from kafka import KafkaProducer
|
22 | 23 | from kafka.admin import KafkaAdminClient, NewTopic
|
23 | 24 | from kafka.errors import NoBrokersAvailable
|
24 | 25 |
|
25 | 26 | import pytest
|
26 | 27 |
|
27 | 28 |
|
28 |
| -BOOTSTRAP_SERVER = "localhost:9092" |
29 |
| -TOPIC_NAME = f"topic-{uuid.uuid4()}" |
30 |
| -CONTAINER_IMAGE_NAME = "kafka-pipeline:1" |
| 29 | +BOOTSTRAP_SERVER = 'localhost:9092' |
| 30 | +TOPIC_NAMES = ['topic1', 'topic2'] |
| 31 | +CONTAINER_IMAGE_NAME = 'kafka-pipeline:1' |
31 | 32 |
|
32 | 33 |
|
33 |
| -@pytest.fixture(scope="module", autouse=True) |
34 |
| -def kafka_container() -> None: |
| 34 | +@pytest.fixture(scope='module') |
| 35 | +def docker_client() -> DockerClient: |
| 36 | + # Build a container image for the pipeline. |
| 37 | + client = docker.from_env() |
| 38 | + client.images.build(path='./', tag=CONTAINER_IMAGE_NAME) |
| 39 | + yield client |
| 40 | + |
| 41 | + |
| 42 | +@pytest.fixture(scope='module', autouse=True) |
| 43 | +def kafka_container(docker_client: DockerClient) -> None: |
35 | 44 | # Start a containerized Kafka server.
|
36 |
| - docker_client = docker.from_env() |
37 |
| - container = docker_client.containers.run( |
38 |
| - "apache/kafka:3.7.0", network_mode="host", detach=True |
39 |
| - ) |
| 45 | + container = docker_client.containers.run('apache/kafka:3.7.0', network_mode='host', detach=True) |
40 | 46 | try:
|
41 |
| - create_topic() |
| 47 | + create_topics() |
| 48 | + send_messages(TOPIC_NAMES[0]) |
| 49 | + send_messages(TOPIC_NAMES[1]) |
42 | 50 | yield
|
43 | 51 | finally:
|
44 | 52 | container.stop()
|
45 | 53 |
|
46 | 54 |
|
47 |
| -def create_topic() -> None: |
48 |
| - # Try to create a Kafka topic. We might need to wait for the Kafka service to start. |
| 55 | +@pytest.fixture |
| 56 | +def file_name_prefix() -> str: |
| 57 | + return f'output-{uuid.uuid4()}' |
| 58 | + |
| 59 | + |
| 60 | +def create_topics() -> None: |
| 61 | + # Try to create Kafka topics. We might need to wait for the Kafka service to start. |
49 | 62 | for _ in range(1, 10):
|
50 | 63 | try:
|
51 | 64 | client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
|
52 | 65 | topics = []
|
53 |
| - topics.append( |
54 |
| - NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1) |
55 |
| - ) |
| 66 | + topics.append(NewTopic(name=TOPIC_NAMES[0], num_partitions=1, replication_factor=1)) |
| 67 | + topics.append(NewTopic(name=TOPIC_NAMES[1], num_partitions=1, replication_factor=1)) |
56 | 68 | client.create_topics(topics)
|
57 | 69 | break
|
58 | 70 | except NoBrokersAvailable:
|
59 | 71 | time.sleep(5)
|
60 | 72 |
|
61 | 73 |
|
62 |
| -def test_read_from_kafka(tmp_path: Path) -> None: |
63 |
| - file_name_prefix = f"output-{uuid.uuid4()}" |
64 |
| - file_name = f"{tmp_path}/{file_name_prefix}-00000-of-00001.txt" |
65 |
| - |
| 74 | +def send_messages(topic: str) -> None: |
66 | 75 | # Send some messages to Kafka
|
67 | 76 | producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER)
|
68 | 77 | for i in range(0, 5):
|
69 |
| - message = f"event-{i}" |
70 |
| - producer.send(TOPIC_NAME, message.encode()) |
71 |
| - |
72 |
| - # Build a container image for the pipeline. |
73 |
| - client = docker.from_env() |
74 |
| - client.images.build(path="./", tag=CONTAINER_IMAGE_NAME) |
| 78 | + message = f'{topic}-{i}' |
| 79 | + producer.send(topic, message.encode()) |
75 | 80 |
|
76 |
| - # Run the pipeline. |
77 |
| - client.containers.run( |
78 |
| - image=CONTAINER_IMAGE_NAME, |
79 |
| - command=f"/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}", |
80 |
| - volumes=["/var/run/docker.sock:/var/run/docker.sock", f"{tmp_path}/:/out"], |
81 |
| - network_mode="host", |
82 |
| - entrypoint="python", |
83 |
| - ) |
84 | 81 |
|
| 82 | +def verify_output(file_name: str, topic: str) -> None: |
85 | 83 | # Verify the pipeline wrote the Kafka messages to the output file.
|
86 |
| - with open(file_name, "r") as f: |
| 84 | + with open(file_name, 'r') as f: |
87 | 85 | text = f.read()
|
88 | 86 | for i in range(0, 5):
|
89 |
| - assert f"event-{i}" in text |
| 87 | + assert f'{topic}-{i}' in text |
| 88 | + |
90 | 89 |
|
| 90 | +def test_read_kafka(docker_client: DockerClient, tmp_path: Path, file_name_prefix: str) -> None: |
| 91 | + topic = TOPIC_NAMES[0] |
| 92 | + |
| 93 | + # Run the containerized Dataflow pipeline. |
| 94 | + docker_client.containers.run( |
| 95 | + image=CONTAINER_IMAGE_NAME, |
| 96 | + command=f'/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {topic}', |
| 97 | + volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'], |
| 98 | + network_mode='host', |
| 99 | + entrypoint='python') |
91 | 100 |
|
92 |
| -if __name__ == "__main__": |
93 |
| - test_read_from_kafka() |
| 101 | + # Verify the pipeline wrote the Kafka messages to the output file. |
| 102 | + verify_output(f'{tmp_path}/{file_name_prefix}-00000-of-00001.txt', topic) |
| 103 | + |
| 104 | + |
| 105 | +def test_read_kafka_multi_topic(docker_client: DockerClient, tmp_path: Path, file_name_prefix: str) -> None: |
| 106 | + # Run the containerized Dataflow pipeline. |
| 107 | + docker_client.containers.run( |
| 108 | + image=CONTAINER_IMAGE_NAME, |
| 109 | + command=f'/pipeline/read_kafka_multi_topic.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER}', |
| 110 | + volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'], |
| 111 | + network_mode='host', |
| 112 | + entrypoint='python') |
| 113 | + |
| 114 | + # Verify the pipeline wrote the Kafka messages to the output files. |
| 115 | + # This code snippet writes outputs to separate directories based on the topic name. |
| 116 | + for topic in TOPIC_NAMES: |
| 117 | + verify_output(f'{tmp_path}/{file_name_prefix}/{topic}/output-00000-of-00001.txt', topic) |
0 commit comments