Skip to content

Commit 4e9d7bc

Browse files
committed
Add test to verify flush when close
1 parent 3744b21 commit 4e9d7bc

File tree

1 file changed

+28
-0
lines changed

1 file changed

+28
-0
lines changed

tests/AcknowledgeTest.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,34 @@ TEST_F(AcknowledgeTest, testAckReceiptEnabled) {
375375
client.close();
376376
}
377377

378+
TEST_F(AcknowledgeTest, testCloseConsumer) {
379+
Client client(lookupUrl);
380+
const auto topic = "test-close-consumer" + unique_str();
381+
Producer producer;
382+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
383+
ConsumerConfiguration consumerConfig;
384+
consumerConfig.setAckGroupingTimeMs(60000);
385+
consumerConfig.setAckGroupingMaxSize(100);
386+
Consumer consumer;
387+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumerConfig, consumer));
388+
389+
producer.send(MessageBuilder().setContent("msg-0").build());
390+
Message msg;
391+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
392+
consumer.acknowledgeAsync(
393+
msg, nullptr); // it just adds the msg id to the pending ack list due to the ack grouping configs
394+
consumer.close(); // it will flush the pending ACK and prevent any further ack
395+
ASSERT_EQ(ResultAlreadyClosed, consumer.acknowledge(msg));
396+
ASSERT_EQ(ResultAlreadyClosed, consumer.acknowledgeCumulative(msg));
397+
ASSERT_EQ(ResultAlreadyClosed, consumer.acknowledge(std::vector<MessageId>{msg.getMessageId()}));
398+
399+
producer.send(MessageBuilder().setContent("msg-1").build());
400+
// Recreate the consumer to verify the first message is acknowledged
401+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumerConfig, consumer));
402+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
403+
ASSERT_EQ("msg-1", msg.getDataAsString());
404+
}
405+
378406
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
379407
testing::Combine(testing::Values(100, 0), testing::Values(true, false)),
380408
[](const testing::TestParamInfo<std::tuple<int, bool>>& info) {

0 commit comments

Comments
 (0)