@@ -267,6 +267,16 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
267267 unAckedMessageTrackerPtr_->clear ();
268268
269269 ClientImplPtr client = client_.lock ();
270+
271+ Lock lock{mutex_};
272+ const auto state = state_.load ();
273+ if (state == Closing || state == Closed) {
274+ lock.unlock ();
275+ Promise<Result, bool > promise;
276+ promise.setFailed (ResultAlreadyClosed);
277+ return promise.getFuture ();
278+ }
279+
270280 long requestId = client->newRequestId ();
271281 SharedBuffer cmd = Commands::newSubscribe (
272282 topic (), subscription_, consumerId_, requestId, getSubType (), getConsumerName (), subscriptionMode_,
@@ -1358,7 +1368,13 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13581368 }
13591369 ackGroupingTrackerPtr_.reset ();
13601370 negativeAcksTracker_->close ();
1371+ cancelTimers ();
13611372
1373+ // Prevent the race that
1374+ // 1. In `connectionOpened`, setCnx() is called and the state is Ready.
1375+ // 2. In this method, state is changed to Closing and a CloseConsumer request is sent.
1376+ // 3. In `connectionOpened`, a Subscribe request is sent.
1377+ std::lock_guard<std::mutex> lock{mutex_};
13621378 ClientConnectionPtr cnx = getCnx ().lock ();
13631379 if (!cnx) {
13641380 // If connection is gone, also the consumer is closed on the broker side
@@ -1373,8 +1389,6 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) {
13731389 return ;
13741390 }
13751391
1376- cancelTimers ();
1377-
13781392 int requestId = client->newRequestId ();
13791393 auto self = get_shared_this_ptr ();
13801394 cnx->sendRequestWithId (Commands::newCloseConsumer (consumerId_, requestId), requestId)
0 commit comments