Skip to content

Commit c6f614d

Browse files
authored
feat(pubsublite): resumable async streaming read write rpc (#8233)
1 parent 6ce79bc commit c6f614d

File tree

2 files changed

+788
-5
lines changed

2 files changed

+788
-5
lines changed

google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,54 @@ class ResumableAsyncStreamingReadWriteRpcImpl
228228
}
229229

230230
future<absl::optional<ResponseType>> Read() override {
231-
// TODO(18suresha) implement
232-
return make_ready_future(absl::optional<ResponseType>());
231+
future<absl::optional<ResponseType>> read_future;
232+
233+
{
234+
std::lock_guard<std::mutex> g{mu_};
235+
switch (stream_state_) {
236+
case State::kShutdown:
237+
return make_ready_future(absl::optional<ResponseType>());
238+
case State::kRetrying:
239+
assert(!read_reinit_done_.has_value());
240+
read_reinit_done_.emplace();
241+
return read_reinit_done_->get_future().then(
242+
[](future<void>) { return absl::optional<ResponseType>(); });
243+
case State::kInitialized:
244+
read_future = stream_->Read();
245+
assert(!in_progress_read_.has_value());
246+
in_progress_read_.emplace();
247+
}
248+
}
249+
250+
return read_future.then(
251+
[this](future<absl::optional<ResponseType>> optional_response_future) {
252+
return OnReadFutureFinish(optional_response_future.get());
253+
});
233254
}
234255

235-
future<bool> Write(RequestType const&, grpc::WriteOptions) override {
236-
// TODO(18suresha) implement
237-
return make_ready_future(false);
256+
future<bool> Write(RequestType const& r, grpc::WriteOptions o) override {
257+
future<bool> write_future;
258+
259+
{
260+
std::lock_guard<std::mutex> g{mu_};
261+
switch (stream_state_) {
262+
case State::kShutdown:
263+
return make_ready_future(false);
264+
case State::kRetrying:
265+
assert(!write_reinit_done_.has_value());
266+
write_reinit_done_.emplace();
267+
return write_reinit_done_->get_future().then(
268+
[](future<void>) { return false; });
269+
case State::kInitialized:
270+
write_future = stream_->Write(r, o);
271+
assert(!in_progress_write_.has_value());
272+
in_progress_write_.emplace();
273+
}
274+
}
275+
276+
return write_future.then([this](future<bool> write_fu) {
277+
return OnWriteFutureFinish(write_fu.get());
278+
});
238279
}
239280

240281
future<void> Shutdown() override {
@@ -252,6 +293,121 @@ class ResumableAsyncStreamingReadWriteRpcImpl
252293

253294
enum class State { kRetrying, kInitialized, kShutdown };
254295

296+
future<absl::optional<ResponseType>> OnReadFutureFinish(
297+
absl::optional<ResponseType> optional_response) {
298+
promise<void> in_progress_read(null_promise_t{});
299+
future<void> read_reinit_done;
300+
bool shutdown;
301+
302+
{
303+
std::lock_guard<std::mutex> g{mu_};
304+
assert(in_progress_read_.has_value());
305+
in_progress_read = std::move(*in_progress_read_);
306+
in_progress_read_.reset();
307+
shutdown = stream_state_ == State::kShutdown;
308+
if (!optional_response.has_value() && !shutdown) {
309+
assert(!read_reinit_done_.has_value());
310+
read_reinit_done_.emplace();
311+
read_reinit_done = read_reinit_done_->get_future();
312+
}
313+
}
314+
315+
in_progress_read.set_value();
316+
317+
if (shutdown) {
318+
return make_ready_future(absl::optional<ResponseType>());
319+
}
320+
321+
if (optional_response.has_value()) {
322+
return make_ready_future(std::move(optional_response));
323+
}
324+
325+
ReadWriteRetryFailedStream();
326+
327+
return read_reinit_done.then(
328+
[](future<void>) { return absl::optional<ResponseType>(); });
329+
}
330+
331+
future<bool> OnWriteFutureFinish(bool write_response) {
332+
promise<void> in_progress_write(null_promise_t{});
333+
future<void> write_reinit_done;
334+
bool shutdown;
335+
336+
{
337+
std::lock_guard<std::mutex> g{mu_};
338+
assert(in_progress_write_.has_value());
339+
in_progress_write = std::move(*in_progress_write_);
340+
in_progress_write_.reset();
341+
shutdown = stream_state_ == State::kShutdown;
342+
if (!write_response && !shutdown) {
343+
assert(!write_reinit_done_.has_value());
344+
write_reinit_done_.emplace();
345+
write_reinit_done = write_reinit_done_->get_future();
346+
}
347+
}
348+
349+
in_progress_write.set_value();
350+
351+
if (shutdown) return make_ready_future(false);
352+
353+
if (write_response) return make_ready_future(true);
354+
355+
ReadWriteRetryFailedStream();
356+
357+
return write_reinit_done.then([](future<void>) { return false; });
358+
}
359+
360+
void ReadWriteRetryFailedStream() {
361+
promise<void> root;
362+
{
363+
std::lock_guard<std::mutex> g{mu_};
364+
if (stream_state_ != State::kInitialized) return;
365+
366+
stream_state_ = State::kRetrying;
367+
assert(!retry_promise_.has_value());
368+
retry_promise_.emplace();
369+
370+
// Assuming that a `Read` fails:
371+
// If an outstanding operation is present, we can't enter the retry
372+
// loop, so we defer it until the outstanding `Write` finishes at
373+
// which point we can enter the retry loop. Since we will return
374+
// `reinit_done_`, we guarantee that another operation of the same type
375+
// is not called while we're waiting for the outstanding operation to
376+
// finish and the retry loop to finish afterward.
377+
378+
future<void> root_future = root.get_future();
379+
380+
// at most one of these will be set
381+
if (in_progress_read_.has_value()) {
382+
root_future =
383+
root_future.then(ChainFuture(in_progress_read_->get_future()));
384+
}
385+
if (in_progress_write_.has_value()) {
386+
root_future =
387+
root_future.then(ChainFuture(in_progress_write_->get_future()));
388+
}
389+
390+
root_future.then([this](future<void>) { FinishOnStreamFail(); });
391+
}
392+
393+
root.set_value();
394+
}
395+
396+
void FinishOnStreamFail() {
397+
future<Status> fail_finish;
398+
{
399+
std::lock_guard<std::mutex> g{mu_};
400+
fail_finish = stream_->Finish();
401+
}
402+
fail_finish.then([this](future<Status> finish_status) {
403+
// retry policy refactor
404+
auto retry_policy = retry_factory_();
405+
auto backoff_policy = backoff_policy_prototype_->clone();
406+
AttemptRetry(finish_status.get(), std::move(retry_policy),
407+
std::move(backoff_policy));
408+
});
409+
}
410+
255411
future<void> ConfigureShutdownOrder(future<void> root_future) {
256412
std::unique_lock<std::mutex> lk{mu_};
257413
switch (stream_state_) {

0 commit comments

Comments
 (0)