Skip to content

Commit e4e090a

Browse files
committed
Add support for sending messages with keys
1 parent 381e9d8 commit e4e090a

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

adc/producer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ def write(self,
3636
msg: Union[bytes, 'Serializable'],
3737
headers: Optional[Union[dict, list]] = None,
3838
delivery_callback: Optional[DeliveryCallback] = log_delivery_errors,
39-
topic: Optional[str] = None) -> None:
39+
topic: Optional[str] = None,
40+
key: Optional[Union[str, bytes]] = None) -> None:
4041
if isinstance(msg, Serializable):
4142
msg = msg.serialize()
4243
if topic is None:
@@ -48,10 +49,10 @@ def write(self,
4849
"or specify the topic argument to write()")
4950
self.logger.debug("writing message to %s", topic)
5051
if delivery_callback is not None:
51-
self._producer.produce(topic, msg, headers=headers,
52+
self._producer.produce(topic, msg, headers=headers, key=key,
5253
on_delivery=delivery_callback)
5354
else:
54-
self._producer.produce(topic, msg, headers=headers)
55+
self._producer.produce(topic, msg, headers=headers, key=key,)
5556

5657
def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int:
5758
"""Attempt to flush enqueued messages. Return the number of messages still

tests/test_kafka_integration.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import time
44
import unittest
55
from datetime import datetime, timedelta
6-
from typing import List
6+
from typing import List, Optional
77

88
import docker
99
import pytest
@@ -59,6 +59,31 @@ def test_round_trip(self):
5959
self.assertEqual(msg.topic(), topic)
6060
self.assertEqual(msg.value(), b"can you hear me?")
6161

62+
def test_message_with_key(self):
63+
"""Try writing a message into the Kafka broker, and try pulling the same
64+
message back out.
65+
66+
"""
67+
topic = "test_message_with_key"
68+
# Push one message in...
69+
simple_write_msg(self.kafka, topic, "can you hear me?", key="test_msg")
70+
# ... and pull it back out.
71+
consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
72+
broker_urls=[self.kafka.address],
73+
group_id="test_consumer",
74+
auth=self.kafka.auth,
75+
))
76+
consumer.subscribe(topic)
77+
stream = consumer.stream()
78+
79+
msg = next(stream)
80+
if msg.error() is not None:
81+
raise Exception(msg.error())
82+
83+
self.assertEqual(msg.topic(), topic)
84+
self.assertEqual(msg.value(), b"can you hear me?")
85+
self.assertEqual(msg.key(), b"test_msg")
86+
6287
def test_reset_to_end(self):
6388
# Write a few messages.
6489
topic = "test_reset_to_end"
@@ -569,13 +594,13 @@ def get_or_create_docker_network(self):
569594
return self.docker_client.networks.create(name="adc-integration-test")
570595

571596

572-
def simple_write_msg(conn: KafkaDockerConnection, topic: str, msg: str):
597+
def simple_write_msg(conn: KafkaDockerConnection, topic: str, msg: str, key: Optional[str] = None):
573598
producer = adc.producer.Producer(adc.producer.ProducerConfig(
574599
broker_urls=[conn.address],
575600
topic=topic,
576601
auth=conn.auth,
577602
))
578-
producer.write(msg)
603+
producer.write(msg, key=key)
579604
producer.flush()
580605

581606

0 commit comments

Comments
 (0)