@@ -34,6 +34,7 @@ struct receive_ctx {
3434 pulsar_result result;
3535 pulsar_consumer_t *consumer;
3636 char *data;
37+ char *producer_name;
3738 std::promise<void > *promise;
3839};
3940
@@ -57,6 +58,9 @@ static void receive_callback(pulsar_result async_result, pulsar_message_t *msg,
5758 const char *data = (const char *)pulsar_message_get_data (msg);
5859 receive_ctx->data = (char *)malloc (strlen (data) * sizeof (char ) + 1 );
5960 strcpy (receive_ctx->data , data);
61+ const char *producer_name = pulsar_message_get_producer_name (msg);
62+ receive_ctx->producer_name = (char *)malloc (strlen (producer_name) * sizeof (char ) + 1 );
63+ strcpy (receive_ctx->producer_name , producer_name);
6064 }
6165 receive_ctx->promise ->set_value ();
6266 pulsar_message_free (msg);
@@ -71,6 +75,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
7175 pulsar_client_t *client = pulsar_client_create (lookup_url, conf);
7276
7377 pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create ();
78+ pulsar_producer_configuration_set_producer_name (producer_conf, " test-producer" );
7479 pulsar_producer_t *producer;
7580 pulsar_result result = pulsar_client_create_producer (client, topic_name, producer_conf, &producer);
7681 ASSERT_EQ (pulsar_result_Ok, result);
@@ -101,12 +106,14 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
101106 // receive asynchronously
102107 std::promise<void > receive_promise;
103108 std::future<void > receive_future = receive_promise.get_future ();
104- struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL , &receive_promise};
109+ struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer, NULL , NULL , &receive_promise};
105110 pulsar_consumer_receive_async (consumer, receive_callback, &receive_ctx);
106111 receive_future.get ();
107112 ASSERT_EQ (pulsar_result_Ok, receive_ctx.result );
113+ ASSERT_STREQ (" test-producer" , receive_ctx.producer_name );
108114 ASSERT_STREQ (content, receive_ctx.data );
109- delete receive_ctx.data ;
115+ free (receive_ctx.data );
116+ free (receive_ctx.producer_name );
110117
111118 ASSERT_EQ (pulsar_result_Ok, pulsar_consumer_unsubscribe (consumer));
112119 ASSERT_EQ (pulsar_result_Ok, pulsar_consumer_close (consumer));
0 commit comments