Skip to content

Commit 93f10ef

Browse files
Support get the producer name of a message (#524)
1 parent a03eb92 commit 93f10ef

File tree

7 files changed

+46
-3
lines changed

7 files changed

+46
-3
lines changed

include/pulsar/Message.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,13 @@ class PULSAR_PUBLIC Message {
195195
*/
196196
const std::string& getSchemaVersion() const;
197197

198+
/**
199+
* Get the producer name which produced this message.
200+
*
201+
* @return the producer name or empty string if not available
202+
*/
203+
const std::string& getProducerName() const noexcept;
204+
198205
bool operator==(const Message& msg) const;
199206

200207
protected:

include/pulsar/c/message.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ PULSAR_PUBLIC const char *pulsar_message_get_schemaVersion(pulsar_message_t *mes
215215

216216
PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion);
217217

218+
/**
219+
* Returns the producer name which produced this message. The pointer points to an internal string, so the
220+
* caller should not free it.
221+
*/
222+
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);
223+
218224
#ifdef __cplusplus
219225
}
220226
#endif

lib/Message.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,13 @@ uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublish
213213

214214
uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
215215

216+
const std::string& Message::getProducerName() const noexcept {
217+
if (!impl_) {
218+
return emptyString;
219+
}
220+
return impl_->metadata.producer_name();
221+
}
222+
216223
bool Message::operator==(const Message& msg) const { return getMessageId() == msg.getMessageId(); }
217224

218225
KeyValue Message::getKeyValueData() const { return KeyValue(impl_->keyValuePtr); }

lib/c/c_Message.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,7 @@ const char *pulsar_message_get_schemaVersion(pulsar_message_t *message) {
140140
int pulsar_message_has_schema_version(pulsar_message_t *message) {
141141
return message->message.hasSchemaVersion();
142142
}
143+
144+
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
145+
return message->message.getProducerName().c_str();
146+
}

tests/BasicEndToEndTest.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,20 @@ TEST(BasicEndToEndTest, testProduceConsume) {
242242
Message receivedMsg;
243243
consumer.receive(receivedMsg);
244244
ASSERT_EQ(content, receivedMsg.getDataAsString());
245+
ASSERT_FALSE(receivedMsg.getProducerName().empty());
246+
ASSERT_EQ(ResultOk, producer.close());
247+
248+
ProducerConfiguration conf;
249+
conf.setProducerName("test-producer");
250+
ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
251+
producer.send(MessageBuilder().setContent("msg-2-content").build());
252+
consumer.receive(receivedMsg);
253+
ASSERT_EQ("msg-2-content", receivedMsg.getDataAsString());
254+
ASSERT_EQ("test-producer", receivedMsg.getProducerName());
255+
consumer.acknowledge(receivedMsg);
245256
ASSERT_EQ(ResultOk, consumer.unsubscribe());
246257
ASSERT_EQ(ResultOk, consumer.close());
247-
ASSERT_EQ(ResultOk, producer.close());
258+
ASSERT_EQ(ResultOk, consumer.close());
248259
ASSERT_EQ(ResultOk, client.close());
249260
}
250261

tests/MessageTest.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ TEST(MessageTest, testMessageContents) {
4242
ASSERT_NE(myContents.c_str(), (char*)msg.getData());
4343
ASSERT_EQ(myContents, msg.getDataAsString());
4444
ASSERT_EQ(std::string("mycontents").length(), msg.getLength());
45+
ASSERT_TRUE(msg.getProducerName().empty());
4546
}
4647

4748
TEST(MessageTest, testAllocatedContents) {

tests/c/c_BasicEndToEndTest.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct receive_ctx {
3434
pulsar_result result;
3535
pulsar_consumer_t *consumer;
3636
char *data;
37+
char *producer_name;
3738
std::promise<void> *promise;
3839
};
3940

@@ -57,6 +58,9 @@ static void receive_callback(pulsar_result async_result, pulsar_message_t *msg,
5758
const char *data = (const char *)pulsar_message_get_data(msg);
5859
receive_ctx->data = (char *)malloc(strlen(data) * sizeof(char) + 1);
5960
strcpy(receive_ctx->data, data);
61+
const char *producer_name = pulsar_message_get_producer_name(msg);
62+
receive_ctx->producer_name = (char *)malloc(strlen(producer_name) * sizeof(char) + 1);
63+
strcpy(receive_ctx->producer_name, producer_name);
6064
}
6165
receive_ctx->promise->set_value();
6266
pulsar_message_free(msg);
@@ -71,6 +75,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
7175
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
7276

7377
pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
78+
pulsar_producer_configuration_set_producer_name(producer_conf, "test-producer");
7479
pulsar_producer_t *producer;
7580
pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer);
7681
ASSERT_EQ(pulsar_result_Ok, result);
@@ -101,12 +106,14 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
101106
// receive asynchronously
102107
std::promise<void> receive_promise;
103108
std::future<void> receive_future = receive_promise.get_future();
104-
struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL, &receive_promise};
109+
struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL, NULL, &receive_promise};
105110
pulsar_consumer_receive_async(consumer, receive_callback, &receive_ctx);
106111
receive_future.get();
107112
ASSERT_EQ(pulsar_result_Ok, receive_ctx.result);
113+
ASSERT_STREQ("test-producer", receive_ctx.producer_name);
108114
ASSERT_STREQ(content, receive_ctx.data);
109-
delete receive_ctx.data;
115+
free(receive_ctx.data);
116+
free(receive_ctx.producer_name);
110117

111118
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
112119
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));

0 commit comments

Comments
 (0)