Skip to content

Commit fbe65bb

Browse files
committed
Fix topic lookup segmentation fault after client is closed
1 parent 648b48b commit fbe65bb

File tree

7 files changed

+109
-24
lines changed

7 files changed

+109
-24
lines changed

lib/ClientImpl.cc

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,25 @@ typedef std::unique_lock<std::mutex> Lock;
7878

7979
typedef std::vector<std::string> StringList;
8080

81+
static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
82+
const ClientConfiguration& clientConfiguration,
83+
ConnectionPool& pool, const AuthenticationPtr& auth) {
84+
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
85+
LOG_DEBUG("Using HTTP Lookup");
86+
return std::make_shared<HTTPLookupService>(serviceUrl, std::cref(clientConfiguration),
87+
std::cref(auth));
88+
} else {
89+
LOG_DEBUG("Using Binary Lookup");
90+
return std::make_shared<BinaryProtoLookupService>(serviceUrl, std::ref(pool),
91+
std::cref(clientConfiguration));
92+
}
93+
}
94+
8195
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
96+
: ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {}
97+
98+
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
99+
LookupServiceFactory&& lookupServiceFactory)
82100
: mutex_(),
83101
state_(Open),
84102
clientConfiguration_(ClientConfiguration(clientConfiguration)
@@ -95,7 +113,8 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
95113
consumerIdGenerator_(0),
96114
closingError(ResultOk),
97115
useProxy_(false),
98-
lookupCount_(0L) {
116+
lookupCount_(0L),
117+
lookupServiceFactory_(std::move(lookupServiceFactory)) {
99118
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
100119
if (loggerFactory) {
101120
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -106,19 +125,9 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
106125
ClientImpl::~ClientImpl() { shutdown(); }
107126

108127
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
109-
LookupServicePtr underlyingLookupServicePtr;
110-
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
111-
LOG_DEBUG("Using HTTP Lookup");
112-
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
113-
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
114-
} else {
115-
LOG_DEBUG("Using Binary Lookup");
116-
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
117-
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
118-
}
119-
120128
auto lookupServicePtr = RetryableLookupService::create(
121-
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
129+
lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()),
130+
clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
122131
return lookupServicePtr;
123132
}
124133

@@ -665,7 +674,6 @@ void ClientImpl::closeAsync(const CloseCallback& callback) {
665674
state_ = Closing;
666675

667676
memoryLimitController_.close();
668-
lookupServicePtr_->close();
669677
for (const auto& it : redirectedClusterLookupServicePtrs_) {
670678
it.second->close();
671679
}
@@ -767,6 +775,7 @@ void ClientImpl::shutdown() {
767775
<< " consumers have been shutdown.");
768776
}
769777

778+
lookupServicePtr_->close();
770779
if (!pool_.close()) {
771780
// pool_ has already been closed. It means shutdown() has been called before.
772781
return;

lib/ClientImpl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
5454

5555
class LookupService;
5656
using LookupServicePtr = std::shared_ptr<LookupService>;
57+
using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
58+
ConnectionPool& pool, const AuthenticationPtr&)>;
5759

5860
class ProducerImplBase;
5961
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -70,6 +72,11 @@ std::string generateRandomName();
7072
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
7173
public:
7274
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
75+
76+
// only for tests
77+
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
78+
LookupServiceFactory&& lookupServiceFactory);
79+
7380
virtual ~ClientImpl();
7481

7582
/**
@@ -205,6 +212,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
205212
std::atomic<Result> closingError;
206213
std::atomic<bool> useProxy_;
207214
std::atomic<uint64_t> lookupCount_;
215+
LookupServiceFactory lookupServiceFactory_;
208216

209217
friend class Client;
210218
};

lib/ResultUtils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ inline bool isResultRetryable(Result result) {
4949
ResultLookupError,
5050
ResultTooManyLookupRequestException,
5151
ResultProducerBlockedQuotaExceededException,
52-
ResultProducerBlockedQuotaExceededError};
52+
ResultProducerBlockedQuotaExceededError,
53+
ResultAlreadyClosed};
5354
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
5455
}
5556

lib/RetryableLookupService.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
#pragma once
2020

21-
#include <chrono>
22-
2321
#include "LookupDataResult.h"
2422
#include "LookupService.h"
2523
#include "NamespaceName.h"
@@ -41,10 +39,10 @@ class RetryableLookupService : public LookupService {
4139
: RetryableLookupService(std::forward<Args>(args)...) {}
4240

4341
void close() override {
44-
lookupCache_->clear();
45-
partitionLookupCache_->clear();
46-
namespaceLookupCache_->clear();
47-
getSchemaCache_->clear();
42+
lookupCache_->close();
43+
partitionLookupCache_->close();
44+
namespaceLookupCache_->close();
45+
getSchemaCache_->close();
4846
}
4947

5048
template <typename... Args>
@@ -89,7 +87,7 @@ class RetryableLookupService : public LookupService {
8987

9088
RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
9189
ExecutorServiceProviderPtr executorProvider)
92-
: lookupService_(lookupService),
90+
: lookupService_(std::move(lookupService)),
9391
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
9492
partitionLookupCache_(
9593
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),

lib/RetryableOperationCache.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
5858

5959
Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) {
6060
std::unique_lock<std::mutex> lock{mutex_};
61+
if (closed_) {
62+
Promise<Result, T> promise;
63+
promise.setFailed(ResultAlreadyClosed);
64+
return promise.getFuture();
65+
}
6166
auto it = operations_.find(key);
6267
if (it == operations_.end()) {
6368
DeadlineTimerPtr timer;
@@ -92,11 +97,12 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
9297
}
9398
}
9499

95-
void clear() {
100+
void close() {
96101
decltype(operations_) operations;
97102
{
98103
std::lock_guard<std::mutex> lock{mutex_};
99104
operations.swap(operations_);
105+
closed_ = true;
100106
}
101107
// cancel() could trigger the listener to erase the key from operations, so we should use a swap way
102108
// to release the lock here
@@ -110,6 +116,7 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
110116
const TimeDuration timeout_;
111117

112118
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
119+
bool closed_{false};
113120
mutable std::mutex mutex_;
114121

115122
DECLARE_LOG_OBJECT()

tests/LookupServiceTest.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <pulsar/Client.h>
2222

2323
#include <algorithm>
24+
#include <atomic>
2425
#include <boost/exception/all.hpp>
2526
#include <chrono>
2627
#include <future>
@@ -36,9 +37,11 @@
3637
#include "lib/Future.h"
3738
#include "lib/HTTPLookupService.h"
3839
#include "lib/LogUtils.h"
40+
#include "lib/LookupDataResult.h"
3941
#include "lib/RetryableLookupService.h"
4042
#include "lib/TimeUtils.h"
4143
#include "lib/Utils.h"
44+
#include "pulsar/Result.h"
4245

4346
DECLARE_LOG_OBJECT()
4447

@@ -500,3 +503,62 @@ TEST(LookupServiceTest, testRedirectionLimit) {
500503
}
501504
}
502505
}
506+
507+
static std::atomic_bool firstTime{true};
508+
509+
class MockLookupService : public BinaryProtoLookupService {
510+
public:
511+
using BinaryProtoLookupService::BinaryProtoLookupService;
512+
513+
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
514+
bool expected = true;
515+
if (firstTime.compare_exchange_strong(expected, false)) {
516+
// Trigger the retry
517+
LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
518+
Promise<Result, LookupDataResultPtr> promise;
519+
promise.setFailed(ResultRetryable);
520+
return promise.getFuture();
521+
}
522+
return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
523+
}
524+
};
525+
526+
TEST(LookupServiceTest, testAfterClientShutdown) {
527+
auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", ClientConfiguration{},
528+
[](const std::string& serviceUrl, const ClientConfiguration&,
529+
ConnectionPool& pool, const AuthenticationPtr&) {
530+
return std::make_shared<MockLookupService>(
531+
serviceUrl, pool, ClientConfiguration{});
532+
});
533+
std::promise<Result> promise;
534+
client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
535+
[&promise](Result result, const Consumer&) { promise.set_value(result); });
536+
client->shutdown();
537+
EXPECT_EQ(ResultDisconnected, promise.get_future().get());
538+
539+
firstTime = true;
540+
std::promise<Result> promise2;
541+
client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
542+
[&promise2](Result result, const Consumer&) { promise2.set_value(result); });
543+
EXPECT_EQ(ResultAlreadyClosed, promise2.get_future().get());
544+
}
545+
546+
TEST(LookupServiceTest, testRetryAfterDestroyed) {
547+
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
548+
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
549+
550+
auto internalLookupService =
551+
std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, ClientConfiguration{});
552+
auto lookupService =
553+
RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);
554+
555+
// Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
556+
// lookup service.
557+
lookupService->close();
558+
std::atomic<Result> result{ResultUnknownError};
559+
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
560+
.addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
561+
EXPECT_EQ(ResultAlreadyClosed, result.load());
562+
pool.close();
563+
executorProvider->close();
564+
}

tests/RetryableOperationCacheTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ TEST_F(RetryableOperationCacheTest, testClear) {
124124
futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));
125125
}
126126
ASSERT_EQ(getSize(*cache), 10);
127-
cache->clear();
127+
cache->close();
128128
for (auto&& future : futures_) {
129129
int value;
130130
// All cancelled futures complete with ResultDisconnected and the default int value

0 commit comments

Comments
 (0)