Skip to content

Commit a49d569

Browse files
authored
feat(pubsublite): resumable async read write rpc stream boilerplate (#8441)
1 parent dce3c95 commit a49d569

File tree

11 files changed

+436
-1
lines changed

11 files changed

+436
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ compile_commands.json
2222
# Ignore IDEA / IntelliJ files
2323
.idea/
2424
cmake-build-*/
25+
.clwb/
2526

2627
# Ignore Visual Studio Code files
2728
.vsbuild/

ci/cloudbuild/builds/coverage.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ instrumented_patterns=(
3131
"/google/cloud/bigtable[/:]"
3232
"/google/cloud/examples[/:]"
3333
"/google/cloud/pubsub[/:]"
34+
"/google/cloud/pubsublite[/:]"
3435
"/google/cloud/spanner[/:]"
3536
"/google/cloud/storage[/:]"
3637
"/generator[/:]"

google/cloud/pubsublite/BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ cc_library(
4444
],
4545
)
4646

47+
load(":pubsublite_testing.bzl", "pubsublite_testing_hdrs", "pubsublite_testing_srcs")
48+
49+
cc_library(
50+
name = "pubsublite_testing",
51+
srcs = pubsublite_testing_srcs,
52+
hdrs = pubsublite_testing_hdrs,
53+
visibility = ["//:__pkg__"],
54+
deps = [
55+
":google_cloud_cpp_pubsublite",
56+
"//google/cloud:google_cloud_cpp_common",
57+
"//google/cloud:google_cloud_cpp_grpc_utils",
58+
],
59+
)
60+
4761
load(":pubsublite_unit_tests.bzl", "pubsublite_unit_tests")
4862

4963
[

google/cloud/pubsublite/CMakeLists.txt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ add_library(
9595
internal/cursor_stub.h
9696
internal/cursor_stub_factory.cc
9797
internal/cursor_stub_factory.h
98+
internal/futures.h
9899
internal/partition_assignment_auth_decorator.cc
99100
internal/partition_assignment_auth_decorator.h
100101
internal/partition_assignment_logging_decorator.cc
@@ -115,6 +116,7 @@ add_library(
115116
internal/publisher_stub.h
116117
internal/publisher_stub_factory.cc
117118
internal/publisher_stub_factory.h
119+
internal/resumable_async_streaming_read_write_rpc.h
118120
internal/stream_factory.h
119121
internal/subscriber_auth_decorator.cc
120122
internal/subscriber_auth_decorator.h
@@ -207,6 +209,26 @@ function (google_cloud_cpp_pubsublite_client_define_tests)
207209
# the GTest::gmock target, and the target names are also weird.
208210
find_package(GTest CONFIG REQUIRED)
209211

212+
add_library(pubsublite_testing INTERFACE)
213+
target_sources(
214+
pubsublite_testing
215+
INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/testing/mock_async_reader_writer.h
216+
${CMAKE_CURRENT_SOURCE_DIR}/testing/mock_backoff_policy.h
217+
${CMAKE_CURRENT_SOURCE_DIR}/testing/mock_retry_policy.h)
218+
219+
target_link_libraries(
220+
pubsublite_testing
221+
INTERFACE google-cloud-cpp::experimental-pubsublite GTest::gmock_main
222+
GTest::gmock GTest::gtest)
223+
target_include_directories(
224+
pubsublite_testing
225+
INTERFACE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>
226+
$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}>
227+
$<INSTALL_INTERFACE:include>)
228+
target_compile_options(pubsublite_testing
229+
INTERFACE ${GOOGLE_CLOUD_CPP_EXCEPTIONS_FLAG})
230+
create_bazel_config(pubsublite_testing YEAR "2022")
231+
210232
set(pubsublite_unit_tests # cmake-format: sort
211233
endpoint_test.cc internal/stream_factory_test.cc)
212234

@@ -218,7 +240,8 @@ function (google_cloud_cpp_pubsublite_client_define_tests)
218240
google_cloud_cpp_add_executable(target "pubsublite" "${fname}")
219241
target_link_libraries(
220242
${target}
221-
PRIVATE google_cloud_cpp_testing
243+
PRIVATE pubsublite_testing
244+
google_cloud_cpp_testing
222245
google_cloud_cpp_testing_grpc
223246
google_cloud_cpp_pubsublite
224247
google_cloud_cpp_pubsublite_mocks

google/cloud/pubsublite/google_cloud_cpp_pubsublite.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ google_cloud_cpp_pubsublite_hdrs = [
3535
"internal/cursor_metadata_decorator.h",
3636
"internal/cursor_stub.h",
3737
"internal/cursor_stub_factory.h",
38+
"internal/futures.h",
3839
"internal/partition_assignment_auth_decorator.h",
3940
"internal/partition_assignment_logging_decorator.h",
4041
"internal/partition_assignment_metadata_decorator.h",
@@ -45,6 +46,7 @@ google_cloud_cpp_pubsublite_hdrs = [
4546
"internal/publisher_metadata_decorator.h",
4647
"internal/publisher_stub.h",
4748
"internal/publisher_stub_factory.h",
49+
"internal/resumable_async_streaming_read_write_rpc.h",
4850
"internal/stream_factory.h",
4951
"internal/subscriber_auth_decorator.h",
5052
"internal/subscriber_logging_decorator.h",
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_FUTURES_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_FUTURES_H
17+
18+
#include "google/cloud/future.h"
19+
20+
namespace google {
21+
namespace cloud {
22+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
23+
namespace pubsublite_internal {
24+
25+
template <typename T>
26+
struct ChainFutureImpl {
27+
template <typename U>
28+
// Note: this drops any exceptions contained in `future<U>`
29+
future<T> operator()(future<U>) {
30+
return std::move(f);
31+
}
32+
33+
future<T> f;
34+
};
35+
36+
/**
37+
* A helper to capture-by-move futures into a second future continuation.
38+
*
39+
* Given two futures `future<U> r` and `future<T> f` we often want to write:
40+
* @code
41+
* f.then([tmp = std::move(f)](future<T>) mutable { return std::move(tmp); }
42+
* @encode
43+
*
44+
* Unfortunately we cannot, as the project needs to support C++11. This is
45+
* a helper to avoid repetition of this pattern.
46+
*/
47+
template <typename T>
48+
ChainFutureImpl<T> ChainFuture(future<T> f) {
49+
return ChainFutureImpl<T>{std::move(f)};
50+
}
51+
52+
} // namespace pubsublite_internal
53+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
54+
} // namespace cloud
55+
} // namespace google
56+
57+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_FUTURES_H
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_RESUMABLE_ASYNC_STREAMING_READ_WRITE_RPC_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_RESUMABLE_ASYNC_STREAMING_READ_WRITE_RPC_H
17+
18+
#include "google/cloud/pubsublite/internal/futures.h"
19+
#include "google/cloud/async_streaming_read_write_rpc.h"
20+
#include "google/cloud/internal/backoff_policy.h"
21+
#include "google/cloud/internal/retry_policy.h"
22+
#include "google/cloud/log.h"
23+
#include "google/cloud/status_or.h"
24+
#include "google/cloud/version.h"
25+
#include <chrono>
26+
#include <memory>
27+
#include <utility>
28+
29+
namespace google {
30+
namespace cloud {
31+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
32+
namespace pubsublite_internal {
33+
34+
using google::cloud::internal::BackoffPolicy;
35+
using google::cloud::internal::RetryPolicy;
36+
37+
/**
38+
* `ResumableAsyncStreamingReadWriteRpc<ResponseType, RequestType>` uses
39+
* callables compatible with this `std::function<>` to create new streams.
40+
*/
41+
template <typename RequestType, typename ResponseType>
42+
using AsyncStreamFactory = std::function<
43+
std::unique_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>()>;
44+
45+
/**
46+
* `ResumableAsyncStreamingReadWriteRpc<ResponseType, RequestType>` uses
47+
* callables compatible with this `std::function<>` to initialize a stream
48+
* from AsyncStreamFactory.
49+
*/
50+
template <typename RequestType, typename ResponseType>
51+
using StreamInitializer = std::function<future<StatusOr<
52+
std::unique_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>>>(
53+
std::unique_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>)>;
54+
55+
using AsyncSleeper = std::function<future<void>(std::chrono::milliseconds)>;
56+
57+
using RetryPolicyFactory = std::function<std::unique_ptr<RetryPolicy>()>;
58+
59+
/**
60+
* Defines the interface for resumable bidirectional streaming RPCs.
61+
*
62+
* Concrete instances of this class resume interrupted streaming RPCs after
63+
* transient failures. On such failures the concrete implementations would
64+
* typically create a new streaming RPC and call an asynchronous function to
65+
* to initialize the stream.
66+
*
67+
* While resuming a streaming RPC is automatic, callers of `Read` and `Write`
68+
* are notified when a new stream is created, as they may need to take action
69+
* when starting on a new stream.
70+
*
71+
* Example (sort of unrealistic) usage:
72+
* @code
73+
* using Underlying = std::unique_ptr<StreamingReadWriteRpc<Req, Res>>;
74+
*
75+
* // Initializes a non-resumable stream, potentially making many
76+
* // chained asynchronous calls.
77+
* future<StatusOr<Underlying>> Initialize(Underlying to_init);
78+
*
79+
* Status Example() {
80+
* std::unique_ptr<ResumableAsyncStreamingReadWriteRpc<Req, Res>> stream =
81+
* MakeResumableStream(&Initialize);
82+
* future<Status> final_status = stream->Start(); // 1
83+
* while (!final_status.is_ready()) {
84+
* if (!stream->Write(GetMessage1()).get()) continue;
85+
* if (!stream->Write(GetMessage2()).get()) continue;
86+
* auto response_1 = stream->Read().get();
87+
* if (!response_1.has_value()) continue;
88+
* ProcessResponse(*response_1);
89+
* auto response_2 = stream->Read().get();
90+
* if (!response_2.has_value()) continue;
91+
* ProcessResponse(*response_2);
92+
* resumable_stream.Shutdown().get();
93+
* return Status();
94+
* }
95+
* return final_status.get();
96+
* }
97+
* @endcode
98+
*/
99+
template <typename RequestType, typename ResponseType>
100+
class ResumableAsyncStreamingReadWriteRpc {
101+
public:
102+
virtual ~ResumableAsyncStreamingReadWriteRpc() = default;
103+
104+
/**
105+
* Start the streaming RPC.
106+
*
107+
* The future returned by this function is satisfied when the stream
108+
* is successfully shut down (in which case in contains an ok status),
109+
* or when the retry policies to resume the stream are exhausted. The
110+
* latter includes the case where the stream fails with a permanent
111+
* error.
112+
*
113+
* While the stream is usable immediately after this function returns,
114+
* any `Read()` or `Write()` calls will fail until the stream is initialized
115+
* successfully.
116+
*/
117+
virtual future<Status> Start() = 0;
118+
119+
/**
120+
* Read one response from the streaming RPC.
121+
*
122+
* @note Only **one** `Read()` operation may be pending at a time. The
123+
* application is responsible for waiting until any previous `Read()`
124+
* operations have completed before calling `Read()` again.
125+
*
126+
* Whether `Read()` can be called before a `Write()` operation is specified by
127+
* each service and RPC. Most services require at least one `Write()` call
128+
* before calling `Read()`. Many services may return more than one response
129+
* for a single `Write()` request. Each service and RPC specifies how to
130+
* discover if more responses will be forthcoming.
131+
*
132+
* The future returned by `Read` will be satisfied when the `Read` call on the
133+
* underlying stream successfully completes or when the internal retry loop
134+
* (un)successfully completes if the underlying call to `Read` fails.
135+
*
136+
* If the future is satisfied with an engaged `optional<>`, it holds a value
137+
* read from the current underlying GRPC stream. If the future is satisfied
138+
* with `nullopt`, the underlying stream may have changed or a permanent error
139+
* has happened. If the `Start` future is not satisfied, the user may call
140+
* `Read` again to read from a new underlying stream.
141+
*/
142+
virtual future<absl::optional<ResponseType>> Read() = 0;
143+
144+
/**
145+
* Write one request to the streaming RPC.
146+
*
147+
* @note Only **one** `Write()` operation may be pending at a time. The
148+
* application is responsible for waiting until any previous `Write()`
149+
* operations have completed before calling `Write()` again.
150+
*
151+
* Whether `Write()` can be called before waiting for a matching `Read()`
152+
* operation is specified by each service and RPC. Many services tolerate
153+
* multiple `Write()` calls before performing or at least receiving a `Read()`
154+
* response.
155+
*
156+
* The future returned by `Write` will be satisfied when the `Write` call on
157+
* the underlying stream successfully completes or when the internal retry
158+
* loop (un)successfully completes if the underlying call to `Write` fails.
159+
*
160+
* If the future is satisfied with `true`, a successful `Write` call was made
161+
* to the current underlying GRPC stream. If the future is satisfied with
162+
* `false`, the underlying stream may have changed or a permanent error has
163+
* happened. If the `Start` future is not satisfied, the user may call `Write`
164+
* again to write the value to a new underlying stream.
165+
*/
166+
virtual future<bool> Write(RequestType const&, grpc::WriteOptions) = 0;
167+
168+
/**
169+
* Finishes the streaming RPC.
170+
*
171+
* This will cause any outstanding `Read` or `Write` to fail. This may be
172+
* called while a `Read` or `Write` of an object of this class is outstanding.
173+
* Internally, the class will manage waiting on `Read` and `Write` calls on a
174+
* gRPC stream before calling `Finish` on its underlying stream as per
175+
* `google::cloud::AsyncStreamingReadWriteRpc`. If the class is currently in a
176+
* retry loop, this will terminate the retry loop and then satisfy the
177+
* returned future. If the class has a present internal outstanding `Read` or
178+
* `Write`, this call will satisfy the returned future only after the internal
179+
* `Read` and/or `Write` finish.
180+
*/
181+
virtual future<void> Shutdown() = 0;
182+
};
183+
184+
} // namespace pubsublite_internal
185+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
186+
} // namespace cloud
187+
} // namespace google
188+
189+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUBLITE_INTERNAL_RESUMABLE_ASYNC_STREAMING_READ_WRITE_RPC_H
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# DO NOT EDIT -- GENERATED BY CMake -- Change the CMakeLists.txt file if needed
16+
17+
"""Automatically generated source lists for pubsublite_testing - DO NOT EDIT."""
18+
19+
pubsublite_testing_hdrs = [
20+
"testing/mock_async_reader_writer.h",
21+
"testing/mock_backoff_policy.h",
22+
"testing/mock_retry_policy.h",
23+
]
24+
25+
pubsublite_testing_srcs = [
26+
]

0 commit comments

Comments
 (0)