Skip to content

Commit c593959

Browse files
committed
Make close idempotent and improve tests
1 parent 3433821 commit c593959

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

lib/RetryableOperationCache.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ class RetryableOperationCache : public std::enable_shared_from_this<RetryableOpe
101101
decltype(operations_) operations;
102102
{
103103
std::lock_guard<std::mutex> lock{mutex_};
104+
if (closed_) {
105+
return;
106+
}
104107
operations.swap(operations_);
105108
closed_ = true;
106109
}

tests/LookupServiceTest.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <pulsar/Client.h>
2222

2323
#include <algorithm>
24-
#include <atomic>
2524
#include <boost/exception/all.hpp>
2625
#include <chrono>
2726
#include <future>
@@ -502,15 +501,13 @@ TEST(LookupServiceTest, testRedirectionLimit) {
502501
}
503502
}
504503

505-
static std::atomic_bool firstTime{true};
506-
507504
class MockLookupService : public BinaryProtoLookupService {
508505
public:
509506
using BinaryProtoLookupService::BinaryProtoLookupService;
510507

511508
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
512509
bool expected = true;
513-
if (firstTime.compare_exchange_strong(expected, false)) {
510+
if (firstTime_.compare_exchange_strong(expected, false)) {
514511
// Trigger the retry
515512
LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
516513
Promise<Result, LookupDataResultPtr> promise;
@@ -519,6 +516,9 @@ class MockLookupService : public BinaryProtoLookupService {
519516
}
520517
return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
521518
}
519+
520+
private:
521+
std::atomic_bool firstTime_{true};
522522
};
523523

524524
TEST(LookupServiceTest, testAfterClientShutdown) {
@@ -531,14 +531,16 @@ TEST(LookupServiceTest, testAfterClientShutdown) {
531531
std::promise<Result> promise;
532532
client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
533533
[&promise](Result result, const Consumer&) { promise.set_value(result); });
534+
// When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
535+
// MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
534536
client->shutdown();
535537
EXPECT_EQ(ResultDisconnected, promise.get_future().get());
536538

537-
firstTime = true;
538-
std::promise<Result> promise2;
539+
// A new subscribeAsync call will fail immediately in the current thread
540+
Result result = ResultOk;
539541
client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
540-
[&promise2](Result result, const Consumer&) { promise2.set_value(result); });
541-
EXPECT_EQ(ResultAlreadyClosed, promise2.get_future().get());
542+
[&result](Result innerResult, const Consumer&) { result = innerResult; });
543+
EXPECT_EQ(ResultAlreadyClosed, result);
542544
}
543545

544546
TEST(LookupServiceTest, testRetryAfterDestroyed) {
@@ -551,12 +553,12 @@ TEST(LookupServiceTest, testRetryAfterDestroyed) {
551553
RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);
552554

553555
// Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
554-
// lookup service.
556+
// lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
555557
lookupService->close();
556-
std::atomic<Result> result{ResultUnknownError};
558+
Result result = ResultOk;
557559
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
558560
.addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
559-
EXPECT_EQ(ResultAlreadyClosed, result.load());
561+
EXPECT_EQ(ResultAlreadyClosed, result);
560562
pool.close();
561563
executorProvider->close();
562564
}

0 commit comments

Comments
 (0)