@@ -293,13 +293,40 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
293293 Result handleResult = ResultOk;
294294
295295 if (result == ResultOk) {
296- LOG_INFO (getName () << " Created consumer on broker " << cnx->cnxString ());
297296 {
298297 Lock mutexLock (mutex_);
298+ if (!changeToReadyState ()) {
299+ resetCnx ();
300+ // The consumer has been
301+ auto client = client_.lock ();
302+ if (client) {
303+ LOG_INFO (getName () << " Closing subscribed consumer since it was already closed" );
304+ int requestId = client->newRequestId ();
305+ auto name = getName ();
306+ cnx->sendRequestWithId (Commands::newCloseConsumer (consumerId_, requestId), requestId)
307+ .addListener ([name](Result result, const ResponseData&) {
308+ if (result == ResultOk) {
309+ LOG_INFO (name << " Closed consumer successfully after subscribe completed" );
310+ } else {
311+ LOG_WARN (name << " Failed to close consumer: " << strResult (result));
312+ }
313+ });
314+ } else {
315+ // This should not happen normally because if client is destroyed, the connection pool
316+ // should also be closed, which means all connections should be closed. Close the
317+ // connection to let broker know this registered consumer is inactive.
318+ LOG_WARN (getName ()
319+ << " Client already closed when subscribe completed, close the connection "
320+ << cnx->cnxString ());
321+ cnx->close (ResultNotConnected);
322+ }
323+ return ResultAlreadyClosed;
324+ }
325+
326+ LOG_INFO (getName () << " Created consumer on broker " << cnx->cnxString ());
299327 setCnx (cnx);
300328 incomingMessages_.clear ();
301329 possibleSendToDeadLetterTopicMessages_.clear ();
302- state_ = Ready;
303330 backoff_.reset ();
304331 if (!messageListener_ && config_.getReceiverQueueSize () == 0 ) {
305332 // Complicated logic since we don't have a isLocked() function for mutex
0 commit comments