Skip to content

Commit 16b620f

Browse files
committed
Support Message.producer_name()
1 parent 879cdfd commit 16b620f

File tree

3 files changed

+11
-2
lines changed

3 files changed

+11
-2
lines changed

pulsar/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ def schema_version(self):
244244
"""
245245
return self._message.schema_version()
246246

247+
def producer_name(self) -> str:
248+
"""
249+
Get the producer name which produced this message
250+
"""
251+
return self._message.producer_name()
252+
247253
@staticmethod
248254
def _wrap(_message):
249255
self = Message()

src/message.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ void export_message(py::module_& m) {
105105
.def("topic_name", &Message::getTopicName, return_value_policy::copy)
106106
.def("redelivery_count", &Message::getRedeliveryCount)
107107
.def("int_schema_version", &Message::getLongSchemaVersion)
108-
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy);
108+
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy)
109+
.def("producer_name", &Message::getProducerName, return_value_policy::copy);
109110

110111
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,
111112
uint32_t batchSize) = &MessageBatch::parseFrom;

tests/pulsar_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,14 @@ def test_producer_is_connected(self):
238238
def test_producer_consumer(self):
239239
client = Client(self.serviceUrl)
240240
consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
241-
producer = client.create_producer("my-python-topic-producer-consumer")
241+
producer = client.create_producer("my-python-topic-producer-consumer",
242+
producer_name="my-producer")
242243
producer.send(b"hello")
243244

244245
msg = consumer.receive(TM)
245246
self.assertTrue(msg)
246247
self.assertEqual(msg.data(), b"hello")
248+
self.assertEqual(msg.producer_name(), "my-producer")
247249

248250
with self.assertRaises(pulsar.Timeout):
249251
consumer.receive(100)

0 commit comments

Comments
 (0)