Skip to content

Commit 2ec734b

Browse files
Fix invalid memory access on the first pending batch receive callback (#441)
1 parent 5940cb5 commit 2ec734b

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

lib/ConsumerImplBase.cc

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,7 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
7676
long diff =
7777
batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
7878
if (diff <= 0) {
79-
Lock batchOptionLock(batchReceiveOptionMutex_);
80-
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
81-
batchOptionLock.unlock();
82-
batchPendingReceives_.pop();
79+
notifyBatchPendingReceivedCallback(popBatchReceiveCallback());
8380
} else {
8481
hasPendingReceives = true;
8582
timeToWaitMs = diff;
@@ -96,20 +93,17 @@ void ConsumerImplBase::doBatchReceiveTimeTask() {
9693
void ConsumerImplBase::failPendingBatchReceiveCallback() {
9794
Lock lock(batchPendingReceiveMutex_);
9895
while (!batchPendingReceives_.empty()) {
99-
OpBatchReceive opBatchReceive = batchPendingReceives_.front();
100-
batchPendingReceives_.pop();
101-
listenerExecutor_->postWork(
102-
[opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
96+
auto callback = popBatchReceiveCallback();
97+
listenerExecutor_->postWork([callback]() { callback(ResultAlreadyClosed, {}); });
10398
}
10499
}
105100

106101
void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
107102
Lock lock(batchPendingReceiveMutex_);
108103
if (!batchPendingReceives_.empty()) {
109-
OpBatchReceive& batchReceive = batchPendingReceives_.front();
110-
batchPendingReceives_.pop();
104+
auto callback = popBatchReceiveCallback();
111105
lock.unlock();
112-
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
106+
notifyBatchPendingReceivedCallback(callback);
113107
}
114108
}
115109

lib/ConsumerImplBase.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,14 @@ class ConsumerImplBase : public HandlerBase {
112112

113113
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
114114

115+
// Note: it should be protected by batchPendingReceiveMutex_ and called when `batchPendingReceives_` is
116+
// not empty
117+
BatchReceiveCallback popBatchReceiveCallback() {
118+
auto callback = std::move(batchPendingReceives_.front().batchReceiveCallback_);
119+
batchPendingReceives_.pop();
120+
return callback;
121+
}
122+
115123
friend class MultiTopicsConsumerImpl;
116124
friend class PulsarFriend;
117125
};

0 commit comments

Comments
 (0)