@@ -74,6 +74,25 @@ static boost::optional<MessageId> getStartMessageId(const boost::optional<Messag
7474 return startMessageId;
7575}
7676
77+ static AckGroupingTracker* newAckGroupingTracker (const ClientImplPtr& client, const std::string& topic,
78+ uint64_t consumerId, const ConsumerConfiguration& config) {
79+ const auto requestIdGenerator = client->getRequestIdGenerator ();
80+ const auto requestIdSupplier = [requestIdGenerator] { return (*requestIdGenerator)++; };
81+
82+ if (TopicName::get (topic)->isPersistent ()) {
83+ if (config.getAckGroupingTimeMs () > 0 ) {
84+ return new AckGroupingTrackerEnabled (
85+ requestIdSupplier, consumerId, config.isAckReceiptEnabled (), config.getAckGroupingTimeMs (),
86+ config.getAckGroupingMaxSize (), client->getIOExecutorProvider ()->get ());
87+ } else {
88+ return new AckGroupingTrackerDisabled (requestIdSupplier, consumerId,
89+ config.isAckReceiptEnabled ());
90+ }
91+ } else {
92+ return new AckGroupingTracker (requestIdSupplier, consumerId, config.isAckReceiptEnabled ());
93+ }
94+ }
95+
7796ConsumerImpl::ConsumerImpl (const ClientImplPtr& client, const std::string& topic,
7897 const std::string& subscriptionName, const ConsumerConfiguration& conf,
7998 bool isPersistent, const ConsumerInterceptorsPtr& interceptors,
@@ -105,6 +124,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic
105124 consumerStr_(" [" + topic + " , " + subscriptionName + " , " + std::to_string(consumerId_) + "] "),
106125 messageListenerRunning_(!conf.isStartPaused()),
107126 negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this , conf)),
127+ ackGroupingTrackerPtr_(newAckGroupingTracker(client, topic, consumerId_, conf)),
108128 readCompacted_(conf.isReadCompacted()),
109129 startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())),
110130 maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
@@ -198,38 +218,7 @@ const std::string& ConsumerImpl::getTopic() const { return topic(); }
198218
199219void ConsumerImpl::start () {
200220 HandlerBase::start ();
201-
202- std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr ()};
203- auto connectionSupplier = [weakSelf]() -> ClientConnectionPtr {
204- auto self = weakSelf.lock ();
205- if (!self) {
206- return nullptr ;
207- }
208- return self->getCnx ().lock ();
209- };
210-
211- // NOTE: start() is always called in `ClientImpl`'s method, so lock() returns not null
212- const auto requestIdGenerator = client_.lock ()->getRequestIdGenerator ();
213- const auto requestIdSupplier = [requestIdGenerator] { return (*requestIdGenerator)++; };
214-
215- // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
216- // constructor completed.
217- if (TopicName::get (topic ())->isPersistent ()) {
218- if (config_.getAckGroupingTimeMs () > 0 ) {
219- ackGroupingTrackerPtr_.reset (new AckGroupingTrackerEnabled (
220- connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled (),
221- config_.getAckGroupingTimeMs (), config_.getAckGroupingMaxSize (),
222- client_.lock ()->getIOExecutorProvider ()->get ()));
223- } else {
224- ackGroupingTrackerPtr_.reset (new AckGroupingTrackerDisabled (
225- connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled ()));
226- }
227- } else {
228- LOG_INFO (getName () << " ACK will NOT be sent to broker for this non-persistent topic." );
229- ackGroupingTrackerPtr_.reset (new AckGroupingTracker (connectionSupplier, requestIdSupplier,
230- consumerId_, config_.isAckReceiptEnabled ()));
231- }
232- ackGroupingTrackerPtr_->start ();
221+ ackGroupingTrackerPtr_->start (std::static_pointer_cast<HandlerBase>(shared_from_this ()));
233222}
234223
235224void ConsumerImpl::beforeConnectionChange (ClientConnection& cnx) { cnx.removeConsumer (consumerId_); }
@@ -591,17 +580,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
591580 LOG_DEBUG (getName () << " metadata.has_num_messages_in_batch() = "
592581 << metadata.has_num_messages_in_batch ());
593582
594- uint32_t numOfMessageReceived = m.impl_ ->metadata .num_messages_in_batch ();
595- auto ackGroupingTrackerPtr = ackGroupingTrackerPtr_;
596- if (ackGroupingTrackerPtr == nullptr ) { // The consumer is closing
583+ const auto state = state_.load (std::memory_order_relaxed);
584+ if (state == Closing || state == Closed) {
597585 return ;
598586 }
599- if (ackGroupingTrackerPtr->isDuplicate (m.getMessageId ())) {
587+ uint32_t numOfMessageReceived = m.impl_ ->metadata .num_messages_in_batch ();
588+ if (ackGroupingTrackerPtr_->isDuplicate (m.getMessageId ())) {
600589 LOG_DEBUG (getName () << " Ignoring message as it was ACKed earlier by same consumer." );
601590 increaseAvailablePermits (cnx, numOfMessageReceived);
602591 return ;
603592 }
604- ackGroupingTrackerPtr.reset ();
605593
606594 if (metadata.has_num_messages_in_batch ()) {
607595 BitSet::Data words (msg.ack_set_size ());
@@ -1340,12 +1328,8 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13401328 incomingMessages_.close ();
13411329
13421330 // Flush pending grouped ACK requests.
1343- if (ackGroupingTrackerPtr_.use_count () != 1 ) {
1344- LOG_ERROR (" AckGroupingTracker is shared by other "
1345- << (ackGroupingTrackerPtr_.use_count () - 1 )
1346- << " threads, which will prevent flushing the ACKs" );
1347- }
1348- ackGroupingTrackerPtr_.reset ();
1331+ ackGroupingTrackerPtr_->flushAndClean ();
1332+ ackGroupingTrackerPtr_->close ();
13491333 negativeAcksTracker_->close ();
13501334
13511335 ClientConnectionPtr cnx = getCnx ().lock ();
@@ -1369,13 +1353,12 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13691353 cnx->sendRequestWithId (Commands::newCloseConsumer (consumerId_, requestId), requestId)
13701354 .addListener ([self, callback](Result result, const ResponseData&) { callback (result); });
13711355}
1372-
13731356const std::string& ConsumerImpl::getName () const { return consumerStr_; }
13741357
13751358void ConsumerImpl::shutdown () { internalShutdown (); }
13761359
13771360void ConsumerImpl::internalShutdown () {
1378- ackGroupingTrackerPtr_. reset ();
1361+ ackGroupingTrackerPtr_-> close ();
13791362 incomingMessages_.clear ();
13801363 possibleSendToDeadLetterTopicMessages_.clear ();
13811364 resetCnx ();
0 commit comments