Skip to content

Commit eea59bb

Browse files
authored
[fix] Fix consumer doesn't acknowledge all chunk message Ids (apache#321)
1 parent f2c580b commit eea59bb

File tree

9 files changed

+100
-62
lines changed

9 files changed

+100
-62
lines changed

lib/AckGroupingTracker.cc

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
#include <atomic>
2323
#include <limits>
24+
#include <set>
2425

2526
#include "BitSet.h"
27+
#include "ChunkMessageIdImpl.h"
2628
#include "ClientConnection.h"
2729
#include "Commands.h"
2830
#include "LogUtils.h"
@@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
4244
}
4345
return;
4446
}
47+
if (ackType == CommandAck_AckType_Individual) {
48+
// If it's individual ack, we need to acknowledge all message IDs in a chunked message Id
49+
// If it's cumulative ack, we only need to ack the last message ID of a chunked message.
50+
// ChunkedMessageId return last chunk message ID by default, so we don't need to handle it.
51+
if (auto chunkMessageId =
52+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
53+
auto msgIdList = chunkMessageId->getChunkedMessageIds();
54+
doImmediateAck(std::set<MessageId>(msgIdList.begin(), msgIdList.end()), callback);
55+
return;
56+
}
57+
}
4558
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
4659
if (waitResponse_) {
4760
const auto requestId = requestIdSupplier_();
@@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
8497
return;
8598
}
8699

100+
std::set<MessageId> ackMsgIds;
101+
102+
for (const auto& msgId : msgIds) {
103+
if (auto chunkMessageId =
104+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
105+
auto msgIdList = chunkMessageId->getChunkedMessageIds();
106+
ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
107+
} else {
108+
ackMsgIds.insert(msgId);
109+
}
110+
}
111+
87112
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
88113
if (waitResponse_) {
89114
const auto requestId = requestIdSupplier_();
90-
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
115+
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
91116
.addListener([callback](Result result, const ResponseData&) {
92117
if (callback) {
93118
callback(result);
94119
}
95120
});
96121
} else {
97-
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
122+
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
98123
if (callback) {
99124
callback(ResultOk);
100125
}
101126
}
102127
} else {
103-
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
128+
auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
104129
auto wrappedCallback = [callback, count](Result result) {
105130
if (--*count == 0 && callback) {
106131
callback(result);
107132
}
108133
};
109-
for (auto&& msgId : msgIds) {
134+
for (auto&& msgId : ackMsgIds) {
110135
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
111136
}
112137
}

lib/ChunkMessageIdImpl.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
2828
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
2929
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
3030
public:
31-
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}
32-
33-
void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }
34-
35-
void setLastChunkMessageId(const MessageId& msgId) {
36-
this->ledgerId_ = msgId.ledgerId();
37-
this->entryId_ = msgId.entryId();
38-
this->partition_ = msgId.partition();
31+
explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
32+
: chunkedMessageIds_(std::move(chunkedMessageIds)) {
33+
auto lastChunkMsgId = chunkedMessageIds_.back();
34+
this->ledgerId_ = lastChunkMsgId.ledgerId();
35+
this->entryId_ = lastChunkMsgId.entryId();
36+
this->partition_ = lastChunkMsgId.partition();
3937
}
4038

41-
std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
39+
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
4240

4341
MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
4442

4543
private:
46-
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
44+
std::vector<MessageId> chunkedMessageIds_;
4745
};
4846
} // namespace pulsar

lib/Commands.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me
583583

584584
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
585585
if (chunkMsgId) {
586-
auto firstId = chunkMsgId->getFirstChunkMessageId();
587-
messageIdData.set_ledgerid(firstId->ledgerId_);
588-
messageIdData.set_entryid(firstId->entryId_);
586+
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
587+
messageIdData.set_ledgerid(firstId.ledgerId());
588+
messageIdData.set_entryid(firstId.entryId());
589589
} else {
590590
messageIdData.set_ledgerid(messageId.ledgerId());
591591
messageIdData.set_entryid(messageId.entryId());

lib/ConsumerImpl.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -479,10 +479,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
479479
return boost::none;
480480
}
481481

482-
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
483-
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
484-
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
485-
messageId = chunkMsgId->build();
482+
messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
486483

487484
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
488485
<< ", sequenceId: " << metadata.sequence_id());
@@ -1174,6 +1171,9 @@ std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& m
11741171
(batchSize > 0) ? batchSize : 1);
11751172
unAckedMessageTrackerPtr_->remove(messageId);
11761173
possibleSendToDeadLetterTopicMessages_.remove(messageId);
1174+
if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
1175+
return std::make_pair(messageId, true);
1176+
}
11771177
return std::make_pair(discardBatch(messageId), true);
11781178
} else if (config_.isBatchIndexAckEnabled()) {
11791179
return std::make_pair(messageId, true);

lib/ConsumerImpl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {
270270

271271
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }
272272

273+
std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }
274+
273275
long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
274276

275277
friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
@@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
292294
// concurrently on the topic) then it guards against broken chunked message which was not fully published
293295
const bool autoAckOldestChunkedMessageOnQueueFull_;
294296

295-
// The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
296-
std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
297297
// This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
298298
// chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
299299
// the associated ChunkedMessageCtx will be removed.

lib/MessageId.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
7676
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
7777
if (chunkMsgId) {
7878
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
79-
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
80-
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
81-
firstChunkIdData.set_entryid(firstChunkId->entryId_);
79+
const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
80+
firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
81+
firstChunkIdData.set_entryid(firstChunkId.entryId());
8282
if (chunkMsgId->partition_ != -1) {
83-
firstChunkIdData.set_partition(firstChunkId->partition_);
83+
firstChunkIdData.set_partition(firstChunkId.partition());
8484
}
8585
}
8686

@@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
9999
MessageId msgId = MessageIdBuilder::from(idData).build();
100100

101101
if (idData.has_first_chunk_message_id()) {
102-
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
103-
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
104-
chunkMsgId->setLastChunkMessageId(msgId);
102+
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
103+
std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}));
105104
return chunkMsgId->build();
106105
}
107106

@@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
121120
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
122121
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
123122
if (chunkMsgId) {
124-
auto firstId = chunkMsgId->getFirstChunkMessageId();
125-
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
126-
<< firstId->batchIndex_ << ");";
123+
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
124+
s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ','
125+
<< firstId.batchIndex() << ");";
127126
}
128127
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
129128
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';

lib/OpSendMsg.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ struct SendArguments {
4545
SendArguments& operator=(const SendArguments&) = delete;
4646
};
4747

48+
typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;
49+
4850
struct OpSendMsg {
4951
const Result result;
5052
const int32_t chunkId;
@@ -54,7 +56,7 @@ struct OpSendMsg {
5456
const boost::posix_time::ptime timeout;
5557
const SendCallback sendCallback;
5658
std::vector<std::function<void(Result)>> trackerCallbacks;
57-
ChunkMessageIdImplPtr chunkedMessageId;
59+
ChunkMessageIdListPtr chunkMessageIdList;
5860
// Use shared_ptr here because producer might resend the message with the same arguments
5961
const std::shared_ptr<SendArguments> sendArgs;
6062

@@ -89,7 +91,7 @@ struct OpSendMsg {
8991
sendArgs(nullptr) {}
9092

9193
OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize,
92-
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId,
94+
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList,
9395
uint64_t producerId, SharedBuffer payload)
9496
: result(ResultOk),
9597
chunkId(metadata.chunk_id()),
@@ -98,7 +100,7 @@ struct OpSendMsg {
98100
messagesSize(messagesSize),
99101
timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)),
100102
sendCallback(std::move(callback)),
101-
chunkedMessageId(chunkedMessageId),
103+
chunkMessageIdList(std::move(chunkMessageIdList)),
102104
sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {}
103105
};
104106

lib/ProducerImpl.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -572,14 +572,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
572572
}
573573
} else {
574574
const bool sendChunks = (totalChunks > 1);
575+
ChunkMessageIdListPtr chunkMessageIdList;
575576
if (sendChunks) {
576577
msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
577578
msgMetadata.set_num_chunks_from_msg(totalChunks);
578579
msgMetadata.set_total_chunk_msg_size(compressedSize);
580+
chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
579581
}
580582

581-
auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
582-
583583
int beginIndex = 0;
584584
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
585585
if (sendChunks) {
@@ -596,7 +596,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
596596
}
597597

598598
auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
599-
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId,
599+
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
600600
producerId_, encryptedPayload);
601601

602602
if (!chunkingEnabled_) {
@@ -887,7 +887,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
887887
return true;
888888
}
889889

890-
const auto& op = *pendingMessagesQueue_.front();
890+
auto& op = *pendingMessagesQueue_.front();
891891
if (op.result != ResultOk) {
892892
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
893893
<< rawMessageId);
@@ -911,13 +911,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
911911
// Message was persisted correctly
912912
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
913913

914-
if (op.chunkedMessageId) {
914+
if (op.chunkMessageIdList) {
915915
// Handling the chunk message id.
916-
if (op.chunkId == 0) {
917-
op.chunkedMessageId->setFirstChunkMessageId(messageId);
918-
} else if (op.chunkId == op.numChunks - 1) {
919-
op.chunkedMessageId->setLastChunkMessageId(messageId);
920-
messageId = op.chunkedMessageId->build();
916+
op.chunkMessageIdList->push_back(messageId);
917+
if (op.chunkId == op.numChunks - 1) {
918+
auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
919+
messageId = chunkedMessageId->build();
921920
}
922921
}
923922

tests/MessageChunkingTest.cc

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
8181
}
8282

8383
void createConsumer(const std::string& topic, Consumer& consumer) {
84-
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
84+
ConsumerConfiguration conf;
85+
conf.setBrokerConsumerStatsCacheTimeInMs(1000);
86+
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
8587
}
8688

8789
void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
@@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
118120
for (int i = 0; i < numMessages; i++) {
119121
MessageId messageId;
120122
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
121-
auto chunkMsgId =
122-
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
123-
ASSERT_TRUE(chunkMsgId);
124123
LOG_INFO("Send " << i << " to " << messageId);
125124
sendMessageIds.emplace_back(messageId);
126125
}
@@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
134133
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
135134
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
136135
auto messageId = msg.getMessageId();
137-
auto chunkMsgId =
138-
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
139-
ASSERT_TRUE(chunkMsgId);
140136
receivedMessageIds.emplace_back(messageId);
137+
consumer.acknowledge(messageId);
141138
}
142139
ASSERT_EQ(receivedMessageIds, sendMessageIds);
143-
ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
140+
for (int i = 0; i < sendMessageIds.size(); ++i) {
141+
auto sendChunkMsgId =
142+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
143+
ASSERT_TRUE(sendChunkMsgId);
144+
auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
145+
PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
146+
ASSERT_TRUE(receiveChunkMsgId);
147+
ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds());
148+
}
144149
ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
145150

146151
// Verify the cache has been cleared
147152
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
148153
ASSERT_EQ(chunkedMessageCache.size(), 0);
149154

155+
BrokerConsumerStats consumerStats;
156+
waitUntil(
157+
std::chrono::seconds(10),
158+
[&] {
159+
return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
160+
consumerStats.getMsgBacklog() == 0;
161+
},
162+
1000);
163+
ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
164+
150165
producer.close();
151166
consumer.close();
152167
}
@@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
317332
TEST(ChunkMessageIdTest, testSetChunkMessageId) {
318333
MessageId msgId;
319334
{
320-
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
321-
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
322-
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
335+
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
336+
std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
337+
MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
323338
msgId = chunkMsgId->build();
324339
// Test the destructor of the underlying message id should also work for the generated messageId.
325340
}
@@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
332347
ASSERT_EQ(deserializedMsgId.entryId(), 5);
333348
ASSERT_EQ(deserializedMsgId.partition(), 6);
334349

335-
auto chunkMsgId =
350+
const auto& chunkMsgId =
336351
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
337352
ASSERT_TRUE(chunkMsgId);
338-
auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
339-
ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
340-
ASSERT_EQ(firstChunkMsgId->entryId_, 2);
341-
ASSERT_EQ(firstChunkMsgId->partition_, 3);
353+
auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
354+
ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
355+
ASSERT_EQ(firstChunkMsgId.entryId(), 2);
356+
ASSERT_EQ(firstChunkMsgId.partition(), 3);
342357
}
343358

344359
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P

0 commit comments

Comments
 (0)