@@ -181,6 +181,268 @@ class ResumableAsyncStreamingReadWriteRpc {
181181 virtual future<void > Shutdown () = 0;
182182};
183183
184+ template <typename RequestType, typename ResponseType>
185+ class ResumableAsyncStreamingReadWriteRpcImpl
186+ : public ResumableAsyncStreamingReadWriteRpc<RequestType, ResponseType> {
187+ public:
188+ ResumableAsyncStreamingReadWriteRpcImpl (
189+ RetryPolicyFactory retry_factory,
190+ std::shared_ptr<BackoffPolicy const > backoff_policy, AsyncSleeper sleeper,
191+ AsyncStreamFactory<RequestType, ResponseType> stream_factory,
192+ StreamInitializer<RequestType, ResponseType> initializer)
193+ : retry_factory_(std::move(retry_factory)),
194+ backoff_policy_prototype_ (std::move(backoff_policy)),
195+ sleeper_(std::move(sleeper)),
196+ stream_factory_(std::move(stream_factory)),
197+ initializer_(std::move(initializer)) {}
198+
199+ ~ResumableAsyncStreamingReadWriteRpcImpl () override {
200+ future<void > shutdown = Shutdown ();
201+ if (!shutdown.is_ready ()) {
202+ GCP_LOG (WARNING) << " `Finish` must be called and finished before object "
203+ " goes out of scope." ;
204+ assert (false );
205+ }
206+ shutdown.get ();
207+ }
208+
209+ ResumableAsyncStreamingReadWriteRpcImpl (
210+ ResumableAsyncStreamingReadWriteRpc<RequestType, ResponseType>&&) =
211+ delete ;
212+ ResumableAsyncStreamingReadWriteRpcImpl& operator =(
213+ ResumableAsyncStreamingReadWriteRpc<RequestType, ResponseType>&&) =
214+ delete ;
215+
216+ future<Status> Start () override {
217+ future<Status> status_future;
218+ {
219+ std::lock_guard<std::mutex> g{mu_};
220+ status_future = status_promise_.get_future ();
221+ assert (!retry_promise_.has_value ());
222+ retry_promise_.emplace ();
223+ }
224+ auto retry_policy = retry_factory_ ();
225+ auto backoff_policy = backoff_policy_prototype_->clone ();
226+ Initialize (std::move (retry_policy), std::move (backoff_policy));
227+ return status_future;
228+ }
229+
230+ future<absl::optional<ResponseType>> Read () override {
231+ // TODO(18suresha) implement
232+ return make_ready_future (absl::optional<ResponseType>());
233+ }
234+
235+ future<bool > Write (RequestType const &, grpc::WriteOptions) override {
236+ // TODO(18suresha) implement
237+ return make_ready_future (false );
238+ }
239+
240+ future<void > Shutdown () override {
241+ promise<void > root_promise;
242+ future<void > root_future =
243+ ConfigureShutdownOrder (root_promise.get_future ());
244+
245+ root_promise.set_value ();
246+ return root_future;
247+ }
248+
249+ private:
250+ using UnderlyingStream =
251+ std::unique_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>;
252+
253+ enum class State { kRetrying , kInitialized , kShutdown };
254+
255+ future<void > ConfigureShutdownOrder (future<void > root_future) {
256+ std::unique_lock<std::mutex> lk{mu_};
257+ switch (stream_state_) {
258+ case State::kShutdown :
259+ return make_ready_future ();
260+ case State::kRetrying :
261+ stream_state_ = State::kShutdown ;
262+ root_future = root_future.then (
263+ ChainFuture (retry_promise_->get_future ().then ([this ](future<void >) {
264+ std::unique_lock<std::mutex> lk{mu_};
265+ CompleteUnsatisfiedOps (Status (), lk);
266+ })));
267+ break ;
268+ case State::kInitialized :
269+ stream_state_ = State::kShutdown ;
270+ if (in_progress_read_) {
271+ root_future =
272+ root_future.then (ChainFuture (in_progress_read_->get_future ()));
273+ }
274+ if (in_progress_write_) {
275+ root_future =
276+ root_future.then (ChainFuture (in_progress_write_->get_future ()));
277+ }
278+ std::shared_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>
279+ stream = std::move (stream_);
280+ CompleteUnsatisfiedOps (Status (), lk);
281+ root_future = root_future.then (
282+ [stream](future<void >) { return future<void >(stream->Finish ()); });
283+ }
284+ return root_future;
285+ }
286+
287+ void SetReadWriteFutures (std::unique_lock<std::mutex>& lk) {
288+ absl::optional<promise<void >> read_reinit_done;
289+ absl::optional<promise<void >> write_reinit_done;
290+
291+ {
292+ read_reinit_done.swap (read_reinit_done_);
293+ write_reinit_done.swap (write_reinit_done_);
294+ }
295+ lk.unlock ();
296+ if (read_reinit_done) read_reinit_done->set_value ();
297+ if (write_reinit_done) write_reinit_done->set_value ();
298+ lk.lock ();
299+ }
300+
301+ void CompleteUnsatisfiedOps (Status status, std::unique_lock<std::mutex>& lk) {
302+ SetReadWriteFutures (lk);
303+ lk.unlock ();
304+ status_promise_.set_value (std::move (status));
305+ lk.lock ();
306+ }
307+
308+ void FinishRetryPromise (std::unique_lock<std::mutex>& lk) {
309+ assert (retry_promise_.has_value ());
310+ promise<void > retry_promise = std::move (retry_promise_.value ());
311+ retry_promise_.reset ();
312+ lk.unlock ();
313+ retry_promise.set_value ();
314+ lk.lock ();
315+ }
316+
317+ void AttemptRetry (Status const & status,
318+ std::shared_ptr<RetryPolicy> retry_policy,
319+ std::shared_ptr<BackoffPolicy> backoff_policy) {
320+ {
321+ std::unique_lock<std::mutex> lk{mu_};
322+ if (stream_state_ == State::kShutdown ) {
323+ return FinishRetryPromise (lk);
324+ }
325+ assert (stream_state_ == State::kRetrying );
326+ }
327+
328+ if (!retry_policy->IsExhausted () && retry_policy->OnFailure (status)) {
329+ sleeper_ (backoff_policy->OnCompletion ())
330+ .then ([this , retry_policy, backoff_policy](future<void >) {
331+ Initialize (retry_policy, backoff_policy);
332+ });
333+ return ;
334+ }
335+
336+ std::unique_lock<std::mutex> lk{mu_};
337+ FinishRetryPromise (lk);
338+ if (stream_state_ == State::kShutdown ) return ;
339+ stream_state_ = State::kShutdown ;
340+
341+ CompleteUnsatisfiedOps (status, lk);
342+ }
343+
344+ void OnInitialize (StatusOr<UnderlyingStream> start_initialize_response,
345+ std::shared_ptr<RetryPolicy> retry_policy,
346+ std::shared_ptr<BackoffPolicy> backoff_policy) {
347+ if (!start_initialize_response.ok ()) {
348+ AttemptRetry (std::move (start_initialize_response.status ()), retry_policy,
349+ backoff_policy);
350+ return ;
351+ }
352+ auto stream = std::move (*start_initialize_response);
353+ std::unique_lock<std::mutex> lk{mu_};
354+ if (stream_state_ == State::kShutdown ) {
355+ stream->Finish ().then ([](future<Status>) {});
356+ } else {
357+ stream_ = std::move (stream);
358+ stream_state_ = State::kInitialized ;
359+ }
360+ FinishRetryPromise (lk);
361+ SetReadWriteFutures (lk);
362+ }
363+
364+ void Initialize (std::shared_ptr<RetryPolicy> retry_policy,
365+ std::shared_ptr<BackoffPolicy> backoff_policy) {
366+ {
367+ std::unique_lock<std::mutex> lk{mu_};
368+ if (stream_state_ == State::kShutdown ) {
369+ return FinishRetryPromise (lk);
370+ }
371+ }
372+
373+ // Since we maintain `stream_` as a `std::unique_ptr<>` as explained below,
374+ // we have to maintain the potential stream, `stream`, as only a
375+ // `std::unique_ptr<>` as well. We need to support C++11, so to enable the
376+ // following lambdas access to `stream`, we temporarily wrap the
377+ // `std::unique_ptr<>` by a `std::shared_ptr<>`.
378+ std::shared_ptr<UnderlyingStream> stream =
379+ std::make_shared<UnderlyingStream>(stream_factory_ ());
380+ (*stream)
381+ ->Start ()
382+ .then ([stream](future<bool > start_future) {
383+ if (start_future.get ()) return make_ready_future (Status ());
384+ return (*stream)->Finish ();
385+ })
386+ .then ([this , stream](future<Status> future_status) {
387+ Status status = future_status.get ();
388+ if (!status.ok ()) {
389+ return make_ready_future (StatusOr<UnderlyingStream>(status));
390+ }
391+ return initializer_ (std::move (*stream));
392+ })
393+ .then ([this , retry_policy, backoff_policy](
394+ future<StatusOr<UnderlyingStream>> initialize_future) {
395+ OnInitialize (initialize_future.get (), retry_policy, backoff_policy);
396+ });
397+ }
398+
399+ RetryPolicyFactory retry_factory_;
400+ std::shared_ptr<BackoffPolicy const > const backoff_policy_prototype_;
401+ AsyncSleeper const sleeper_;
402+ AsyncStreamFactory<RequestType, ResponseType> const stream_factory_;
403+ StreamInitializer<RequestType, ResponseType> const initializer_;
404+
405+ std::mutex mu_;
406+
407+ // We maintain `stream_` as a `std::unique_ptr<>` so that we ensure that we
408+ // are the sole owner of the stream. This is important to ensure that we
409+ // maintain the constraints of the stream's API, ex. we have knowledge of all
410+ // outstanding operations of `stream_`. A `std::shared_ptr<>` would allow the
411+ // stream to be leaked through `initializer_`, preventing us from having that
412+ // certainty.
413+ std::unique_ptr<AsyncStreamingReadWriteRpc<RequestType, ResponseType>>
414+ stream_; // ABSL_GUARDED_BY(mu_)
415+ State stream_state_ = State::kRetrying ; // ABSL_GUARDED_BY(mu_)
416+ // The below two member variables are to present a future to the user when
417+ // `Read` or `Write` finish with a failure. The returned future is only
418+ // completed when the invoked retry loop completes on success or permanent
419+ // error.
420+ absl::optional<promise<void >> read_reinit_done_; // ABSL_GUARDED_BY(mu_)
421+ absl::optional<promise<void >> write_reinit_done_; // ABSL_GUARDED_BY(mu_)
422+ // The below two member variables are promises that finish their future when
423+ // an internal `Read` or `Write` is finished respectively. This allows us to
424+ // perform retry logic (calling `Finish` internally) right when there are no
425+ // more outstanding `Read`s or `Write`s.
426+ absl::optional<promise<void >> in_progress_read_; // ABSL_GUARDED_BY(mu_)
427+ absl::optional<promise<void >> in_progress_write_; // ABSL_GUARDED_BY(mu_)
428+ absl::optional<promise<void >> retry_promise_; // ABSL_GUARDED_BY(mu_)
429+
430+ promise<Status> status_promise_;
431+ };
432+
433+ template <typename RequestType, typename ResponseType>
434+ std::unique_ptr<ResumableAsyncStreamingReadWriteRpc<RequestType, ResponseType>>
435+ MakeResumableAsyncStreamingReadWriteRpcImpl (
436+ RetryPolicyFactory retry_factory,
437+ std::shared_ptr<BackoffPolicy const > backoff_policy, AsyncSleeper sleeper,
438+ AsyncStreamFactory<RequestType, ResponseType> stream_factory,
439+ StreamInitializer<RequestType, ResponseType> initializer) {
440+ return absl::make_unique<
441+ ResumableAsyncStreamingReadWriteRpcImpl<RequestType, ResponseType>>(
442+ std::move (retry_factory), std::move (backoff_policy), std::move (sleeper),
443+ std::move (stream_factory), std::move (initializer));
444+ }
445+
184446} // namespace pubsublite_internal
185447GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
186448} // namespace cloud
0 commit comments