Skip to content

Commit d1347c7

Browse files
authored
[fix][client] Memory leak during GET_LAST_MESSAGE_ID command processing. (apache#301)
1 parent ffee4a0 commit d1347c7

File tree

3 files changed

+40
-19
lines changed

3 files changed

+40
-19
lines changed

lib/ClientConnection.cc

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,13 @@ void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec,
11421142
}
11431143
}
11441144

1145+
void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec,
1146+
ClientConnection::LastMessageIdRequestData data) {
1147+
if (!ec) {
1148+
data.promise->setFailed(ResultTimeout);
1149+
}
1150+
}
1151+
11451152
void ClientConnection::handleKeepAliveTimeout() {
11461153
if (isClosed()) {
11471154
return;
@@ -1251,7 +1258,7 @@ void ClientConnection::close(Result result) {
12511258
kv.second.setFailed(result);
12521259
}
12531260
for (auto& kv : pendingGetLastMessageIdRequests) {
1254-
kv.second.setFailed(result);
1261+
kv.second.promise->setFailed(result);
12551262
}
12561263
for (auto& kv : pendingGetNamespaceTopicsRequests) {
12571264
kv.second.setFailed(result);
@@ -1299,23 +1306,24 @@ Commands::ChecksumType ClientConnection::getChecksumType() const {
12991306
Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId,
13001307
uint64_t requestId) {
13011308
Lock lock(mutex_);
1302-
Promise<Result, GetLastMessageIdResponse> promise;
1309+
auto promise = std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
13031310
if (isClosed()) {
13041311
lock.unlock();
13051312
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
1306-
promise.setFailed(ResultNotConnected);
1307-
return promise.getFuture();
1313+
promise->setFailed(ResultNotConnected);
1314+
return promise->getFuture();
13081315
}
13091316

1310-
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
1317+
LastMessageIdRequestData requestData;
1318+
requestData.promise = promise;
1319+
requestData.timer = executor_->createDeadlineTimer();
1320+
requestData.timer->expires_from_now(operationsTimeout_);
1321+
requestData.timer->async_wait(std::bind(&ClientConnection::handleGetLastMessageIdTimeout,
1322+
shared_from_this(), std::placeholders::_1, requestData));
1323+
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData));
13111324
lock.unlock();
1312-
sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)
1313-
.addListener([promise](Result result, const ResponseData& data) {
1314-
if (result != ResultOk) {
1315-
promise.setFailed(result);
1316-
}
1317-
});
1318-
return promise.getFuture();
1325+
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
1326+
return promise->getFuture();
13191327
}
13201328

13211329
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
@@ -1635,11 +1643,11 @@ void ClientConnection::handleError(const proto::CommandError& error) {
16351643
PendingGetLastMessageIdRequestsMap::iterator it =
16361644
pendingGetLastMessageIdRequests_.find(error.request_id());
16371645
if (it != pendingGetLastMessageIdRequests_.end()) {
1638-
auto getLastMessageIdPromise = it->second;
1646+
auto getLastMessageIdPromise = it->second.promise;
16391647
pendingGetLastMessageIdRequests_.erase(it);
16401648
lock.unlock();
16411649

1642-
getLastMessageIdPromise.setFailed(result);
1650+
getLastMessageIdPromise->setFailed(result);
16431651
} else {
16441652
PendingGetNamespaceTopicsMap::iterator it =
16451653
pendingGetNamespaceTopicsRequests_.find(error.request_id());
@@ -1719,16 +1727,16 @@ void ClientConnection::handleGetLastMessageIdResponse(
17191727
auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
17201728

17211729
if (it != pendingGetLastMessageIdRequests_.end()) {
1722-
auto getLastMessageIdPromise = it->second;
1730+
auto getLastMessageIdPromise = it->second.promise;
17231731
pendingGetLastMessageIdRequests_.erase(it);
17241732
lock.unlock();
17251733

17261734
if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
1727-
getLastMessageIdPromise.setValue(
1735+
getLastMessageIdPromise->setValue(
17281736
{toMessageId(getLastMessageIdResponse.last_message_id()),
17291737
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
17301738
} else {
1731-
getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
1739+
getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
17321740
}
17331741
} else {
17341742
lock.unlock();

lib/ClientConnection.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
201201
DeadlineTimerPtr timer;
202202
};
203203

204+
struct LastMessageIdRequestData {
205+
GetLastMessageIdResponsePromisePtr promise;
206+
DeadlineTimerPtr timer;
207+
};
208+
204209
/*
205210
* handler for connectAsync
206211
* creates a ConnectionPtr which has a valid ClientConnection object
@@ -243,6 +248,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
243248

244249
void handleLookupTimeout(const boost::system::error_code&, LookupRequestData);
245250

251+
void handleGetLastMessageIdTimeout(const boost::system::error_code&, LastMessageIdRequestData data);
252+
246253
void handleKeepAliveTimeout();
247254

248255
template <typename Handler>
@@ -342,7 +349,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
342349
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
343350
PendingConsumerStatsMap pendingConsumerStatsMap_;
344351

345-
typedef std::map<long, Promise<Result, GetLastMessageIdResponse>> PendingGetLastMessageIdRequestsMap;
352+
typedef std::map<long, LastMessageIdRequestData> PendingGetLastMessageIdRequestsMap;
346353
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
347354

348355
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;

lib/GetLastMessageIdResponse.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@
2323

2424
#include <iostream>
2525

26+
#include "Future.h"
27+
2628
namespace pulsar {
2729

30+
class GetLastMessageIdResponse;
31+
typedef Promise<Result, GetLastMessageIdResponse> GetLastMessageIdResponsePromise;
32+
typedef std::shared_ptr<GetLastMessageIdResponsePromise> GetLastMessageIdResponsePromisePtr;
33+
2834
class GetLastMessageIdResponse {
2935
friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) {
3036
os << "lastMessageId: " << response.lastMessageId_;
@@ -52,7 +58,7 @@ class GetLastMessageIdResponse {
5258
private:
5359
MessageId lastMessageId_;
5460
MessageId markDeletePosition_;
55-
bool hasMarkDeletePosition_;
61+
bool hasMarkDeletePosition_ = false;
5662
};
5763

5864
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;

0 commit comments

Comments
 (0)