Skip to content

Commit 2980bbe

Browse files
authored
[feat] Support acknowledging a list of messages (#23)
1 parent a1ec5b9 commit 2980bbe

21 files changed

+270
-22
lines changed

include/pulsar/Consumer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ class PULSAR_PUBLIC Consumer {
164164
*/
165165
Result acknowledge(const MessageId& messageId);
166166

167+
/**
168+
* Acknowledge the consumption of a list of message.
169+
* @param messageIdList
170+
*/
171+
Result acknowledge(const MessageIdList& messageIdList);
172+
167173
/**
168174
* Asynchronously acknowledge the reception of a single message.
169175
*
@@ -186,6 +192,14 @@ class PULSAR_PUBLIC Consumer {
186192
*/
187193
void acknowledgeAsync(const MessageId& messageId, ResultCallback callback);
188194

195+
/**
196+
* Asynchronously acknowledge the consumption of a list of message.
197+
* @param messageIdList
198+
* @param callback the callback that is triggered when the message has been acknowledged or not
199+
* @return
200+
*/
201+
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback);
202+
189203
/**
190204
* Acknowledge the reception of all the messages in the stream up to (and including)
191205
* the provided message.

include/pulsar/MessageId.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <iosfwd>
2626
#include <memory>
2727
#include <string>
28+
#include <vector>
2829

2930
namespace pulsar {
3031

@@ -107,6 +108,8 @@ class PULSAR_PUBLIC MessageId {
107108
typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
108109
MessageIdImplPtr impl_;
109110
};
111+
112+
typedef std::vector<MessageId> MessageIdList;
110113
} // namespace pulsar
111114

112115
#endif // MESSAGE_ID_H

lib/AckGroupingTracker.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
6262
*/
6363
virtual void addAcknowledge(const MessageId& msgId) {}
6464

65+
/**
66+
* Adding message ID list into ACK group for individual ACK.
67+
* @param[in] msgIds of the message to be ACKed.
68+
*/
69+
virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
70+
6571
/**
6672
* Adding message ID into ACK group for cumulative ACK.
6773
* @param[in] msgId ID of the message to be ACKed.

lib/AckGroupingTrackerEnabled.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@ void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId) {
7676
}
7777
}
7878

79+
void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& msgIds) {
80+
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
81+
for (const auto& msgId : msgIds) {
82+
this->pendingIndividualAcks_.emplace(msgId);
83+
}
84+
if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() >= this->ackGroupingMaxSize_) {
85+
this->flush();
86+
}
87+
}
88+
7989
void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId) {
8090
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
8191
if (msgId > this->nextCumulativeAckMsgId_) {

lib/AckGroupingTrackerEnabled.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
6161
void start() override;
6262
bool isDuplicate(const MessageId& msgId) override;
6363
void addAcknowledge(const MessageId& msgId) override;
64+
void addAcknowledgeList(const MessageIdList& msgIds) override;
6465
void addAcknowledgeCumulative(const MessageId& msgId) override;
6566
void close() override;
6667
void flush() override;

lib/Consumer.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ Result Consumer::acknowledge(const MessageId& messageId) {
115115
return result;
116116
}
117117

118+
Result Consumer::acknowledge(const MessageIdList& messageIdList) {
119+
if (!impl_) {
120+
return ResultConsumerNotInitialized;
121+
}
122+
Promise<bool, Result> promise;
123+
impl_->acknowledgeAsync(messageIdList, WaitForCallback(promise));
124+
Result result;
125+
promise.getFuture().get(result);
126+
return result;
127+
}
128+
118129
void Consumer::acknowledgeAsync(const Message& message, ResultCallback callback) {
119130
if (!impl_) {
120131
callback(ResultConsumerNotInitialized);
@@ -133,6 +144,15 @@ void Consumer::acknowledgeAsync(const MessageId& messageId, ResultCallback callb
133144
impl_->acknowledgeAsync(messageId, callback);
134145
}
135146

147+
void Consumer::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
148+
if (!impl_) {
149+
callback(ResultConsumerNotInitialized);
150+
return;
151+
}
152+
153+
impl_->acknowledgeAsync(messageIdList, callback);
154+
}
155+
136156
Result Consumer::acknowledgeCumulative(const Message& message) {
137157
return acknowledgeCumulative(message.getMessageId());
138158
}

lib/ConsumerImpl.cc

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -943,16 +943,17 @@ inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() {
943943
BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value"));
944944
}
945945

946-
void ConsumerImpl::statsCallback(Result res, ResultCallback callback, CommandAck_AckType ackType) {
947-
consumerStatsBasePtr_->messageAcknowledged(res, ackType);
946+
void ConsumerImpl::statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType,
947+
uint32_t numAcks) {
948+
consumerStatsBasePtr_->messageAcknowledged(res, ackType, numAcks);
948949
if (callback) {
949950
callback(res);
950951
}
951952
}
952953

953954
void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
954-
ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
955-
callback, CommandAck_AckType_Individual);
955+
ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(),
956+
std::placeholders::_1, callback, CommandAck_AckType_Individual, 1);
956957
if (msgId.batchIndex() != -1 &&
957958
!batchAcknowledgementTracker_.isBatchReady(msgId, CommandAck_AckType_Individual)) {
958959
cb(ResultOk);
@@ -961,9 +962,19 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
961962
doAcknowledgeIndividual(msgId, cb);
962963
}
963964

965+
void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
966+
ResultCallback cb =
967+
std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(), std::placeholders::_1, callback,
968+
proto::CommandAck_AckType_Individual, messageIdList.size());
969+
// Currently not supported batch message id individual index ack.
970+
this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
971+
this->unAckedMessageTrackerPtr_->remove(messageIdList);
972+
cb(ResultOk);
973+
}
974+
964975
void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
965-
ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
966-
callback, CommandAck_AckType_Cumulative);
976+
ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(),
977+
std::placeholders::_1, callback, CommandAck_AckType_Cumulative, 1);
967978
if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
968979
cb(ResultCumulativeAcknowledgementNotAllowedError);
969980
return;

lib/ConsumerImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class ConsumerImpl : public ConsumerImplBase {
109109
void receiveAsync(ReceiveCallback& callback) override;
110110
void unsubscribeAsync(ResultCallback callback) override;
111111
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
112+
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override;
112113
void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
113114
void closeAsync(ResultCallback callback) override;
114115
void start() override;
@@ -181,8 +182,9 @@ class ConsumerImpl : public ConsumerImplBase {
181182
// TODO - Convert these functions to lambda when we move to C++11
182183
Result receiveHelper(Message& msg);
183184
Result receiveHelper(Message& msg, int timeout);
184-
void statsCallback(Result, ResultCallback, CommandAck_AckType);
185185
void executeNotifyCallback(Message& msg);
186+
void statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType,
187+
uint32_t numAcks = 1);
186188
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
187189
void failPendingReceiveCallback();
188190
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;

lib/ConsumerImplBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
5555
void batchReceiveAsync(BatchReceiveCallback callback);
5656
virtual void unsubscribeAsync(ResultCallback callback) = 0;
5757
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
58+
virtual void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) = 0;
5859
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
5960
virtual void closeAsync(ResultCallback callback) = 0;
6061
virtual void start() = 0;

lib/MultiTopicsConsumerImpl.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,43 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
647647
}
648648
}
649649

650+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
651+
if (state_ != Ready) {
652+
callback(ResultAlreadyClosed);
653+
return;
654+
}
655+
656+
std::unordered_map<std::string, MessageIdList> topicToMessageId;
657+
for (const MessageId& messageId : messageIdList) {
658+
auto topicName = messageId.getTopicName();
659+
topicToMessageId[topicName].emplace_back(messageId);
660+
}
661+
662+
auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
663+
auto cb = [callback, needCallBack](Result result) {
664+
if (result != ResultOk) {
665+
LOG_ERROR("Filed when acknowledge list: " << result);
666+
// set needCallBack is -1 to avoid repeated callback.
667+
needCallBack->store(-1);
668+
callback(result);
669+
return;
670+
}
671+
if (--(*needCallBack) == 0) {
672+
callback(result);
673+
}
674+
};
675+
for (const auto& kv : topicToMessageId) {
676+
auto optConsumer = consumers_.find(kv.first);
677+
if (optConsumer.is_present()) {
678+
unAckedMessageTrackerPtr_->remove(kv.second);
679+
optConsumer.value()->acknowledgeAsync(kv.second, cb);
680+
} else {
681+
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
682+
callback(ResultUnknownError);
683+
}
684+
}
685+
}
686+
650687
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
651688
callback(ResultOperationNotSupported);
652689
}

0 commit comments

Comments
 (0)