diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index cc8f3204..0ecf3dd8 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -160,8 +160,14 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); + newProducerRequestId_ = requestId; cnx->sendRequestWithId(cmd, requestId) - .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { + .addListener([this, self, cnx, promise, requestId](Result result, const ResponseData& responseData) { + if (!newProducerRequestId_ || newProducerRequestId_.get() != requestId) { + LOG_WARN(getName() << "Received response for an old request, ignoring"); + return; + } + Result handleResult = handleCreateProducer(cnx, result, responseData); if (handleResult == ResultOk) { promise.setSuccess(); @@ -977,6 +983,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer void ProducerImpl::disconnectProducer(const boost::optional& assignedBrokerUrl) { LOG_INFO("Broker notification of Closed producer: " << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + assignedBrokerUrl.get()) : "")); + newProducerRequestId_.reset(); resetCnx(); scheduleReconnection(assignedBrokerUrl); } diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 77bd6d1a..e81a44d2 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -214,6 +214,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { ProducerInterceptorsPtr interceptors_; bool retryOnCreationError_; + + boost::optional newProducerRequestId_; }; struct ProducerImplCmp {