Skip to content

Commit ad79bec

Browse files
authored
[fix] Fix wrong behavior when removing the chunkedMessageCtx (#110)
Fixes #104 ### Motivation Currently, the consumer ack the last message when the chunked messages exceed maxPendingChunkMessages. This is wrong behavior. This may lead to unexpected data loss. This PR also fixes serval issues related to maxPendingChunkedMessages: https://github.com/apache/pulsar-client-cpp/blob/1f7fdb86c409c1486d160528137f10ce07dcf3b2/lib/ConsumerImpl.cc#L387-L407 In the current logic, there are two `putIfAbsent` operations here, and they are confusing. If a new chunk message is received, it will be added to the chunkedMessageCache. But if the size of the cache reaches the maxPendingChunkedMessages, it will remove at least 1 ctx from the cache due to `chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1`. But the message is then put into the cache again. This can lead to unnecessary ctx buffer memory allocations. Here are some key point of this issue: <img width="961" alt="image" src="https://user-images.githubusercontent.com/16974619/200777662-59c2e262-fb68-4f03-b1b0-8a312cb1ac58.png"> ### Modifications * Fix consumer acked the wrong message when pending chunked messages exceed maxPendingChunkMessages * Fix wrong behavior when remove the ctx from the chunkedMessageCache.
1 parent 5a27ba3 commit ad79bec

File tree

3 files changed

+94
-23
lines changed

3 files changed

+94
-23
lines changed

lib/ConsumerImpl.cc

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,19 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
322322
}
323323
}
324324

325+
void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) {
326+
if (autoAck) {
327+
doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
328+
if (result != ResultOk) {
329+
LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid
330+
<< ", messageId: " << messageId);
331+
}
332+
});
333+
} else {
334+
trackMessage(messageId);
335+
}
336+
}
337+
325338
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
326339
checkExpiredChunkedTimer_->expires_from_now(
327340
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
@@ -347,12 +360,7 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
347360
}
348361
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
349362
LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
350-
doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {
351-
if (result != ResultOk) {
352-
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
353-
<< uuid << ", messageId: " << msgId);
354-
}
355-
});
363+
discardChunkMessages(uuid, msgId, true);
356364
}
357365
return true;
358366
});
@@ -383,29 +391,18 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
383391

384392
auto it = chunkedMessageCache_.find(uuid);
385393

386-
if (chunkId == 0) {
387-
if (it == chunkedMessageCache_.end()) {
388-
it = chunkedMessageCache_.putIfAbsent(
389-
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
390-
}
394+
if (chunkId == 0 && it == chunkedMessageCache_.end()) {
391395
if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
392396
chunkedMessageCache_.removeOldestValues(
393397
chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
394-
[this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) {
395-
if (autoAckOldestChunkedMessageOnQueueFull_) {
396-
doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
397-
if (result != ResultOk) {
398-
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
399-
<< uuid << ", messageId: " << messageId);
400-
}
401-
});
402-
} else {
403-
trackMessage(messageId);
398+
[this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
399+
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
400+
discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
404401
}
405402
});
406-
it = chunkedMessageCache_.putIfAbsent(
407-
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
408403
}
404+
it = chunkedMessageCache_.putIfAbsent(
405+
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
409406
}
410407

411408
auto& chunkedMsgCtx = it->second;

lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ class ConsumerImpl : public ConsumerImplBase {
307307
std::atomic_bool expireChunkMessageTaskScheduled_{false};
308308

309309
void triggerCheckExpiredChunkedTimer();
310+
void discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck);
310311

311312
/**
312313
* Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the

tests/MessageChunkingTest.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,79 @@ TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
180180
consumer.close();
181181
}
182182

183+
TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
184+
if (toString(GetParam()) != "None") {
185+
return;
186+
}
187+
const std::string topic = "MessageChunkingTest-testMaxPendingChunkMessages-" + toString(GetParam()) +
188+
std::to_string(time(nullptr));
189+
Consumer consumer;
190+
ConsumerConfiguration consumerConf;
191+
consumerConf.setMaxPendingChunkedMessage(1);
192+
consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
193+
createConsumer(topic, consumer, consumerConf);
194+
Producer producer;
195+
createProducer(topic, producer);
196+
197+
auto msg = MessageBuilder().setContent("chunk-0-0|").build();
198+
auto& metadata = PulsarFriend::getMessageMetadata(msg);
199+
metadata.set_num_chunks_from_msg(2);
200+
metadata.set_chunk_id(0);
201+
metadata.set_uuid("0");
202+
metadata.set_total_chunk_msg_size(100);
203+
204+
producer.send(msg);
205+
206+
auto msg2 = MessageBuilder().setContent("chunk-1-0|").build();
207+
auto& metadata2 = PulsarFriend::getMessageMetadata(msg2);
208+
metadata2.set_num_chunks_from_msg(2);
209+
metadata2.set_uuid("1");
210+
metadata2.set_chunk_id(0);
211+
metadata2.set_total_chunk_msg_size(100);
212+
213+
producer.send(msg2);
214+
215+
auto msg3 = MessageBuilder().setContent("chunk-1-1|").build();
216+
auto& metadata3 = PulsarFriend::getMessageMetadata(msg3);
217+
metadata3.set_num_chunks_from_msg(2);
218+
metadata3.set_uuid("1");
219+
metadata3.set_chunk_id(1);
220+
metadata3.set_total_chunk_msg_size(100);
221+
222+
producer.send(msg3);
223+
224+
Message receivedMsg;
225+
ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000));
226+
ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1-0|chunk-1-1|");
227+
228+
consumer.redeliverUnacknowledgedMessages();
229+
230+
// The consumer may acknowledge the wrong message(the latest message) in the old version of codes. This
231+
// test case ensure that it should not happen again.
232+
Message receivedMsg2;
233+
ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000));
234+
ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|");
235+
236+
consumer.acknowledge(receivedMsg2);
237+
238+
consumer.redeliverUnacknowledgedMessages();
239+
auto msg4 = MessageBuilder().setContent("chunk-0-1|").build();
240+
auto& metadata4 = PulsarFriend::getMessageMetadata(msg4);
241+
metadata4.set_num_chunks_from_msg(2);
242+
metadata4.set_uuid("0");
243+
metadata4.set_chunk_id(1);
244+
metadata4.set_total_chunk_msg_size(100);
245+
246+
producer.send(msg4);
247+
248+
// This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore.
249+
Message receivedMsg3;
250+
consumer.receive(receivedMsg3, 3000);
251+
252+
producer.close();
253+
consumer.close();
254+
}
255+
183256
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
184257
INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
185258
::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,

0 commit comments

Comments
 (0)