Skip to content

Commit e737716

Browse files
Fix the wrong backoff computation when retrying (apache#296)
### Motivation All the retryable operations share the same `Backoff` object in `RetryableLookupService`, so if the reconnection happens for some times, the delay of retrying will keeps the maximum value (30 seconds). ### Modifications Refactor the design of the `RetryableLookupService`: - Add a `RetryableOperation` class to represent a retryable operation, each instance has its own `Backoff` object. The operation could only be executed once. - Add a `RetryableOperationCache` class to represent a map that maps a specific name to its associated operation. It's an optimization that if an operation (e.g. find the owner topic of topic A) was not complete while the same operation was executed, the future would be reused. - In `RetryableLookupService`, just maintain some caches for different operations. - Add `RetryableOperationCacheTest` to verify the behaviors.
1 parent 51d11db commit e737716

File tree

9 files changed

+444
-133
lines changed

9 files changed

+444
-133
lines changed

lib/ClientImpl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
598598
state_ = Closing;
599599

600600
memoryLimitController_.close();
601+
lookupServicePtr_->close();
601602

602603
auto producers = producers_.move();
603604
auto consumers = consumers_.move();

lib/LookupService.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class LookupService {
8686
const std::string& version = "") = 0;
8787

8888
virtual ~LookupService() {}
89+
90+
virtual void close() {}
8991
};
9092

9193
typedef std::shared_ptr<LookupService> LookupServicePtr;

lib/RetryableLookupService.h

Lines changed: 26 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,17 @@
1818
*/
1919
#pragma once
2020

21-
#include <algorithm>
22-
#include <memory>
23-
24-
#include "Backoff.h"
25-
#include "ExecutorService.h"
26-
#include "LogUtils.h"
2721
#include "LookupDataResult.h"
2822
#include "LookupService.h"
29-
#include "SynchronizedHashMap.h"
23+
#include "NamespaceName.h"
24+
#include "RetryableOperationCache.h"
3025
#include "TopicName.h"
3126

3227
namespace pulsar {
3328

34-
class RetryableLookupService : public LookupService,
35-
public std::enable_shared_from_this<RetryableLookupService> {
29+
class RetryableLookupService : public LookupService {
3630
private:
37-
friend class PulsarFriend;
31+
friend class LookupServiceTest;
3832
struct PassKey {
3933
explicit PassKey() {}
4034
};
@@ -44,123 +38,58 @@ class RetryableLookupService : public LookupService,
4438
explicit RetryableLookupService(PassKey, Args&&... args)
4539
: RetryableLookupService(std::forward<Args>(args)...) {}
4640

41+
void close() override {
42+
lookupCache_->clear();
43+
partitionLookupCache_->clear();
44+
namespaceLookupCache_->clear();
45+
getSchemaCache_->clear();
46+
}
47+
4748
template <typename... Args>
4849
static std::shared_ptr<RetryableLookupService> create(Args&&... args) {
4950
return std::make_shared<RetryableLookupService>(PassKey{}, std::forward<Args>(args)...);
5051
}
5152

5253
LookupResultFuture getBroker(const TopicName& topicName) override {
53-
return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
54-
[this, topicName] { return lookupService_->getBroker(topicName); });
54+
return lookupCache_->run("get-broker-" + topicName.toString(),
55+
[this, topicName] { return lookupService_->getBroker(topicName); });
5556
}
5657

5758
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
58-
return executeAsync<LookupDataResultPtr>(
59+
return partitionLookupCache_->run(
5960
"get-partition-metadata-" + topicName->toString(),
6061
[this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); });
6162
}
6263

6364
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
6465
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override {
65-
return executeAsync<NamespaceTopicsPtr>(
66+
return namespaceLookupCache_->run(
6667
"get-topics-of-namespace-" + nsName->toString(),
6768
[this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
6869
}
6970

7071
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override {
71-
return executeAsync<SchemaInfo>("get-schema" + topicName->toString(), [this, topicName, version] {
72+
return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] {
7273
return lookupService_->getSchema(topicName, version);
7374
});
7475
}
7576

76-
template <typename T>
77-
Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) {
78-
Promise<Result, T> promise;
79-
executeAsyncImpl(key, f, promise, timeout_);
80-
return promise.getFuture();
81-
}
82-
8377
private:
8478
const std::shared_ptr<LookupService> lookupService_;
85-
const TimeDuration timeout_;
86-
Backoff backoff_;
87-
const ExecutorServiceProviderPtr executorProvider_;
88-
89-
SynchronizedHashMap<std::string, DeadlineTimerPtr> backoffTimers_;
79+
RetryableOperationCachePtr<LookupResult> lookupCache_;
80+
RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
81+
RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
82+
RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;
9083

9184
RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
9285
ExecutorServiceProviderPtr executorProvider)
9386
: lookupService_(lookupService),
94-
timeout_(boost::posix_time::seconds(timeoutSeconds)),
95-
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
96-
boost::posix_time::milliseconds(0)),
97-
executorProvider_(executorProvider) {}
98-
99-
std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return shared_from_this(); }
100-
101-
// NOTE: Set the visibility to fix compilation error in GCC 6
102-
template <typename T>
103-
#ifndef _WIN32
104-
__attribute__((visibility("hidden")))
105-
#endif
106-
void
107-
executeAsyncImpl(const std::string& key, std::function<Future<Result, T>()> f, Promise<Result, T> promise,
108-
TimeDuration remainingTime) {
109-
auto weakSelf = weak_from_this();
110-
f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) {
111-
auto self = weakSelf.lock();
112-
if (!self) {
113-
return;
114-
}
115-
116-
if (result == ResultOk) {
117-
backoffTimers_.remove(key);
118-
promise.setValue(value);
119-
} else if (result == ResultRetryable) {
120-
if (remainingTime.total_milliseconds() <= 0) {
121-
backoffTimers_.remove(key);
122-
promise.setFailed(ResultTimeout);
123-
return;
124-
}
125-
126-
DeadlineTimerPtr timerPtr;
127-
try {
128-
timerPtr = executorProvider_->get()->createDeadlineTimer();
129-
} catch (const std::runtime_error& e) {
130-
LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what());
131-
promise.setFailed(ResultConnectError);
132-
return;
133-
}
134-
auto it = backoffTimers_.emplace(key, timerPtr);
135-
auto& timer = *(it.first->second);
136-
auto delay = std::min(backoff_.next(), remainingTime);
137-
timer.expires_from_now(delay);
138-
139-
auto nextRemainingTime = remainingTime - delay;
140-
LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds()
141-
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
142-
<< " ms");
143-
timer.async_wait([this, weakSelf, key, f, promise,
144-
nextRemainingTime](const boost::system::error_code& ec) {
145-
auto self = weakSelf.lock();
146-
if (!self || ec) {
147-
if (self && ec != boost::asio::error::operation_aborted) {
148-
LOG_ERROR("The timer for " << key << " failed: " << ec.message());
149-
}
150-
// The lookup service has been destructed or the timer has been cancelled
151-
promise.setFailed(ResultTimeout);
152-
return;
153-
}
154-
executeAsyncImpl(key, f, promise, nextRemainingTime);
155-
});
156-
} else {
157-
backoffTimers_.remove(key);
158-
promise.setFailed(result);
159-
}
160-
});
161-
}
162-
163-
DECLARE_LOG_OBJECT()
87+
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeoutSeconds)),
88+
partitionLookupCache_(
89+
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeoutSeconds)),
90+
namespaceLookupCache_(
91+
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeoutSeconds)),
92+
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeoutSeconds)) {}
16493
};
16594

16695
} // namespace pulsar

lib/RetryableOperation.h

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Result.h>
22+
23+
#include <algorithm>
24+
#include <atomic>
25+
#include <functional>
26+
#include <memory>
27+
28+
#include "Backoff.h"
29+
#include "ExecutorService.h"
30+
#include "Future.h"
31+
#include "LogUtils.h"
32+
33+
namespace pulsar {
34+
35+
template <typename T>
36+
class RetryableOperation : public std::enable_shared_from_this<RetryableOperation<T>> {
37+
struct PassKey {
38+
explicit PassKey() {}
39+
};
40+
41+
RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, int timeoutSeconds,
42+
DeadlineTimerPtr timer)
43+
: name_(name),
44+
func_(std::move(func)),
45+
timeout_(boost::posix_time::seconds(timeoutSeconds)),
46+
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
47+
boost::posix_time::milliseconds(0)),
48+
timer_(timer) {}
49+
50+
public:
51+
template <typename... Args>
52+
explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward<Args>(args)...) {}
53+
54+
template <typename... Args>
55+
static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) {
56+
return std::make_shared<RetryableOperation<T>>(PassKey{}, std::forward<Args>(args)...);
57+
}
58+
59+
Future<Result, T> run() {
60+
bool expected = false;
61+
if (!started_.compare_exchange_strong(expected, true)) {
62+
return promise_.getFuture();
63+
}
64+
return runImpl(timeout_);
65+
}
66+
67+
void cancel() {
68+
promise_.setFailed(ResultDisconnected);
69+
boost::system::error_code ec;
70+
timer_->cancel(ec);
71+
}
72+
73+
private:
74+
const std::string name_;
75+
std::function<Future<Result, T>()> func_;
76+
const TimeDuration timeout_;
77+
Backoff backoff_;
78+
Promise<Result, T> promise_;
79+
std::atomic_bool started_{false};
80+
DeadlineTimerPtr timer_;
81+
82+
Future<Result, T> runImpl(TimeDuration remainingTime) {
83+
std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()};
84+
func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) {
85+
auto self = weakSelf.lock();
86+
if (!self) {
87+
return;
88+
}
89+
if (result == ResultOk) {
90+
promise_.setValue(value);
91+
return;
92+
}
93+
if (result != ResultRetryable) {
94+
promise_.setFailed(result);
95+
return;
96+
}
97+
if (remainingTime.total_milliseconds() <= 0) {
98+
promise_.setFailed(ResultTimeout);
99+
return;
100+
}
101+
102+
auto delay = std::min(backoff_.next(), remainingTime);
103+
timer_->expires_from_now(delay);
104+
105+
auto nextRemainingTime = remainingTime - delay;
106+
LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
107+
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
108+
<< " ms");
109+
timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
110+
auto self = weakSelf.lock();
111+
if (!self) {
112+
return;
113+
}
114+
if (ec) {
115+
if (ec == boost::asio::error::operation_aborted) {
116+
LOG_DEBUG("Timer for " << name_ << " is cancelled");
117+
promise_.setFailed(ResultTimeout);
118+
} else {
119+
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
120+
}
121+
} else {
122+
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
123+
<< nextRemainingTime.total_milliseconds() << " ms");
124+
runImpl(nextRemainingTime);
125+
}
126+
});
127+
});
128+
return promise_.getFuture();
129+
}
130+
131+
DECLARE_LOG_OBJECT()
132+
};
133+
134+
} // namespace pulsar

0 commit comments

Comments
 (0)