Skip to content

Commit 42bcc58

Browse files
committed
Fix AckGroupingTracker not flushed during close
1 parent c9ab527 commit 42bcc58

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

lib/AckGroupingTrackerEnabled.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "AckGroupingTrackerEnabled.h"
2121

2222
#include <climits>
23+
#include <memory>
2324
#include <mutex>
2425

2526
#include "ClientConnection.h"
@@ -172,9 +173,10 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
172173
std::lock_guard<std::mutex> lock(this->mutexTimer_);
173174
this->timer_ = this->executor_->createDeadlineTimer();
174175
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
175-
auto self = shared_from_this();
176-
this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
177-
if (!ec) {
176+
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
177+
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
178+
auto self = weakSelf.lock();
179+
if (self && !ec) {
178180
this->flush();
179181
this->scheduleTimer();
180182
}

lib/ConsumerImpl.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,11 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13311331
incomingMessages_.close();
13321332

13331333
// Flush pending grouped ACK requests.
1334+
if (ackGroupingTrackerPtr_.use_count() != 1) {
1335+
LOG_ERROR("AckGroupingTracker is shared by other "
1336+
<< (ackGroupingTrackerPtr_.use_count() - 1)
1337+
<< " threads, which will prevent flushing the ACKs");
1338+
}
13341339
ackGroupingTrackerPtr_.reset();
13351340
negativeAcksTracker_->close();
13361341

0 commit comments

Comments
 (0)