Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit e615ce1

Browse files
authored
refactor: adapt async retry loop from Bigtable (#244)
1 parent a509602 commit e615ce1

11 files changed

+753
-4
lines changed

google/cloud/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ load(":google_cloud_cpp_grpc_utils_unit_tests.bzl", "google_cloud_cpp_grpc_utils
8888
":google_cloud_cpp_grpc_utils",
8989
"//google/cloud:google_cloud_cpp_common",
9090
"//google/cloud/testing_util:google_cloud_cpp_testing",
91+
"//google/cloud/testing_util:google_cloud_cpp_testing_grpc",
9192
"@com_github_grpc_grpc//:grpc++",
9293
"@com_google_googleapis//:bigtable_protos",
9394
"@com_google_googleapis//:grpc_utils_protos",

google/cloud/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
328328
grpc_utils/grpc_error_delegate.h
329329
grpc_utils/version.h
330330
internal/async_read_stream_impl.h
331+
internal/async_retry_unary_rpc.h
331332
internal/background_threads_impl.cc
332333
internal/background_threads_impl.h
333334
internal/completion_queue_impl.cc
@@ -361,6 +362,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
361362
completion_queue_test.cc
362363
connection_options_test.cc
363364
grpc_error_delegate_test.cc
365+
internal/async_retry_unary_rpc_test.cc
364366
internal/background_threads_impl_test.cc
365367
internal/pagination_range_test.cc)
366368

@@ -375,6 +377,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
375377
target_link_libraries(
376378
${target}
377379
PRIVATE google_cloud_cpp_grpc_utils
380+
google_cloud_cpp_testing_grpc
378381
google_cloud_cpp_testing
379382
google_cloud_cpp_common
380383
googleapis-c++::bigtable_protos

google/cloud/google_cloud_cpp_grpc_utils.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ google_cloud_cpp_grpc_utils_hdrs = [
2727
"grpc_utils/grpc_error_delegate.h",
2828
"grpc_utils/version.h",
2929
"internal/async_read_stream_impl.h",
30+
"internal/async_retry_unary_rpc.h",
3031
"internal/background_threads_impl.h",
3132
"internal/completion_queue_impl.h",
3233
"internal/pagination_range.h",

google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ google_cloud_cpp_grpc_utils_unit_tests = [
2020
"completion_queue_test.cc",
2121
"connection_options_test.cc",
2222
"grpc_error_delegate_test.cc",
23+
"internal/async_retry_unary_rpc_test.cc",
2324
"internal/background_threads_impl_test.cc",
2425
"internal/pagination_range_test.cc",
2526
]
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_RETRY_UNARY_RPC_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_RETRY_UNARY_RPC_H
17+
18+
#include "google/cloud/completion_queue.h"
19+
#include "google/cloud/internal/completion_queue_impl.h"
20+
#include "google/cloud/internal/make_unique.h"
21+
#include "google/cloud/version.h"
22+
#include <google/protobuf/empty.pb.h>
23+
24+
namespace google {
25+
namespace cloud {
26+
inline namespace GOOGLE_CLOUD_CPP_NS {
27+
namespace internal {
28+
29+
/**
30+
* Make an asynchronous unary RPC with retries.
31+
*
32+
* This class creates a future<> that becomes satisfied when an asynchronous
33+
* operation either:
34+
*
35+
* - Succeeds.
36+
* - Fails with a non-retryable error.
37+
* - The retry policy expires.
38+
*
39+
* The class retries the operation, using a backoff policy to wait between
40+
* retries. The class does not block, it uses the completion queue to wait.
41+
*
42+
* @tparam AsyncCallType the type of the callable used to start the asynchronous
43+
* operation. This is typically a lambda that wraps both the `Client` object
44+
* and the member function to invoke.
45+
* @tparam RequestType the type of the request object.
46+
* @tparam IdempotencyPolicy the type of the idempotency policy.
47+
* @tparam Sig discover the signature and return type of `AsyncCallType`.
48+
* @tparam ResponseType the discovered response type for `AsyncCallType`.
49+
* @tparam validate_parameters validate the other template parameters.
50+
*/
51+
template <typename RPCBackoffPolicy, typename RPCRetryPolicy,
52+
typename AsyncCallType, typename RequestType>
53+
class RetryAsyncUnaryRpc {
54+
public:
55+
//@{
56+
/// @name Convenience aliases for the RPC request and response types.
57+
using Request = RequestType;
58+
using Response =
59+
typename AsyncCallResponseType<AsyncCallType, RequestType>::type;
60+
//@}
61+
62+
/**
63+
* Start the asynchronous retry loop.
64+
*
65+
* @param location typically the name of the function that created this
66+
* asynchronous retry loop.
67+
* @param rpc_retry_policy controls the number of retries, and what errors are
68+
* considered retryable.
69+
* @param rpc_backoff_policy determines the wait time between retries.
70+
* @param idempotent_policy determines if a request is retryable.
71+
* @param metadata_update_policy controls how to update the metadata fields in
72+
* the request.
73+
* @param async_call the callable to start a new asynchronous operation.
74+
* @param request the parameters of the request.
75+
* @param cq the completion queue where the retry loop is executed.
76+
* @return a future that becomes satisfied when (a) one of the retry attempts
77+
* is successful, or (b) one of the retry attempts fails with a
78+
* non-retryable error, or (c) one of the retry attempts fails with a
79+
* retryable error, but the request is non-idempotent, or (d) the
80+
* retry policy is expired.
81+
*/
82+
static future<StatusOr<Response>> Start(
83+
CompletionQueue cq, char const* location,
84+
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
85+
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy, bool is_idempotent,
86+
AsyncCallType async_call, Request request) {
87+
std::shared_ptr<RetryAsyncUnaryRpc> self(new RetryAsyncUnaryRpc(
88+
location, std::move(rpc_retry_policy), std::move(rpc_backoff_policy),
89+
is_idempotent, std::move(async_call), std::move(request)));
90+
auto future = self->final_result_.get_future();
91+
self->StartIteration(self, std::move(cq));
92+
return future;
93+
}
94+
95+
private:
96+
// The constructor is private because we always want to wrap the object in
97+
// a shared pointer. The lifetime is controlled by any pending operations in
98+
// the CompletionQueue.
99+
RetryAsyncUnaryRpc(char const* location,
100+
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
101+
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
102+
bool is_idempotent, AsyncCallType async_call,
103+
Request request)
104+
: location_(location),
105+
rpc_retry_policy_(std::move(rpc_retry_policy)),
106+
rpc_backoff_policy_(std::move(rpc_backoff_policy)),
107+
is_idempotent_(is_idempotent),
108+
async_call_(std::move(async_call)),
109+
request_(std::move(request)) {}
110+
111+
/// The callback for a completed request, successful or not.
112+
static void OnCompletion(std::shared_ptr<RetryAsyncUnaryRpc> self,
113+
CompletionQueue cq, StatusOr<Response> result) {
114+
if (result) {
115+
self->final_result_.set_value(std::move(result));
116+
return;
117+
}
118+
if (!self->is_idempotent_) {
119+
self->final_result_.set_value(self->DetailedStatus(
120+
"non-idempotent operation failed", result.status()));
121+
return;
122+
}
123+
if (!self->rpc_retry_policy_->OnFailure(result.status())) {
124+
auto failure_description =
125+
RPCRetryPolicy::RetryableTraits::IsPermanentFailure(result.status())
126+
? "permanent failure"
127+
: "retry policy exhausted";
128+
self->final_result_.set_value(
129+
self->DetailedStatus(failure_description, result.status()));
130+
return;
131+
}
132+
cq.MakeRelativeTimer(self->rpc_backoff_policy_->OnCompletion())
133+
.then([self, cq](future<StatusOr<std::chrono::system_clock::time_point>>
134+
result) {
135+
if (auto tp = result.get()) {
136+
self->StartIteration(self, cq);
137+
} else {
138+
self->final_result_.set_value(
139+
self->DetailedStatus("timer error", tp.status()));
140+
}
141+
});
142+
}
143+
144+
/// The callback to start another iteration of the retry loop.
145+
static void StartIteration(std::shared_ptr<RetryAsyncUnaryRpc> self,
146+
CompletionQueue cq) {
147+
auto context =
148+
::google::cloud::internal::make_unique<grpc::ClientContext>();
149+
150+
cq.MakeUnaryRpc(self->async_call_, self->request_, std::move(context))
151+
.then([self, cq](future<StatusOr<Response>> fut) {
152+
self->OnCompletion(self, cq, fut.get());
153+
});
154+
}
155+
156+
/// Generate an error message
157+
Status DetailedStatus(char const* context, Status const& status) {
158+
std::string full_message = location_;
159+
full_message += context;
160+
full_message += ", last error=";
161+
full_message += status.message();
162+
return Status(status.code(), std::move(full_message));
163+
}
164+
165+
char const* location_;
166+
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy_;
167+
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy_;
168+
bool is_idempotent_;
169+
170+
AsyncCallType async_call_;
171+
Request request_;
172+
173+
promise<StatusOr<Response>> final_result_;
174+
};
175+
176+
/**
177+
* Automatically deduce the type for `RetryAsyncUnaryRpc` and start the
178+
* asynchronous retry loop.
179+
*
180+
* @param location typically the name of the function that created this
181+
* asynchronous retry loop.
182+
* @param rpc_retry_policy controls the number of retries, and what errors are
183+
* considered retryable.
184+
* @param rpc_backoff_policy determines the wait time between retries.
185+
* @param idempotent_policy determines if a request is retryable.
186+
* @param metadata_update_policy controls how to update the metadata fields in
187+
* the request.
188+
* @param async_call the callable to start a new asynchronous operation.
189+
* @param request the parameters of the request.
190+
* @param cq the completion queue where the retry loop is executed.
191+
*
192+
* @return a future that becomes satisfied when (a) one of the retry attempts
193+
* is successful, or (b) one of the retry attempts fails with a
194+
* non-retryable error, or (c) one of the retry attempts fails with a
195+
* retryable error, but the request is non-idempotent, or (d) the
196+
* retry policy is expired.
197+
*/
198+
template <typename RPCBackoffPolicy, typename RPCRetryPolicy,
199+
typename AsyncCallType, typename RequestType,
200+
typename async_call_t = typename std::decay<AsyncCallType>::type,
201+
typename request_t = typename std::decay<RequestType>::type,
202+
typename std::enable_if<
203+
google::cloud::internal::is_invocable<
204+
async_call_t, grpc::ClientContext*, request_t const&,
205+
grpc::CompletionQueue*>::value,
206+
int>::type = 0>
207+
future<StatusOr<typename AsyncCallResponseType<async_call_t, request_t>::type>>
208+
StartRetryAsyncUnaryRpc(CompletionQueue cq, char const* location,
209+
std::unique_ptr<RPCRetryPolicy> rpc_retry_policy,
210+
std::unique_ptr<RPCBackoffPolicy> rpc_backoff_policy,
211+
bool is_idempotent, AsyncCallType&& async_call,
212+
RequestType&& request) {
213+
return RetryAsyncUnaryRpc<RPCBackoffPolicy, RPCRetryPolicy, async_call_t,
214+
request_t>::Start(std::move(cq), location,
215+
std::move(rpc_retry_policy),
216+
std::move(rpc_backoff_policy),
217+
is_idempotent,
218+
std::forward<AsyncCallType>(
219+
async_call),
220+
std::forward<RequestType>(
221+
request));
222+
}
223+
224+
} // namespace internal
225+
} // namespace GOOGLE_CLOUD_CPP_NS
226+
} // namespace cloud
227+
} // namespace google
228+
229+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_RETRY_UNARY_RPC_H

0 commit comments

Comments
 (0)