2121
2222#include " Firestore/core/src/firebase/firestore/remote/grpc_connection.h"
2323#include " Firestore/core/src/firebase/firestore/remote/grpc_util.h"
24+ #include " Firestore/core/src/firebase/firestore/util/log.h"
2425
2526namespace firebase {
2627namespace firestore {
@@ -96,6 +97,7 @@ GrpcStream::GrpcStream(
9697}
9798
9899GrpcStream::~GrpcStream () {
100+ LOG_DEBUG (" GrpcStream('%s'): destroying stream" , this );
99101 HARD_ASSERT (completions_.empty (),
100102 " GrpcStream is being destroyed without proper shutdown" );
101103 MaybeUnregister ();
@@ -156,11 +158,15 @@ void GrpcStream::MaybeWrite(absl::optional<BufferedWrite> maybe_write) {
156158}
157159
158160void GrpcStream::FinishImmediately () {
161+ LOG_DEBUG (" GrpcStream('%s'): finishing without notifying observers" , this );
162+
159163 Shutdown ();
160164 UnsetObserver ();
161165}
162166
163167void GrpcStream::FinishAndNotify (const Status& status) {
168+ LOG_DEBUG (" GrpcStream('%s'): finishing and notifying observers" , this );
169+
164170 Shutdown ();
165171
166172 if (observer_) {
@@ -173,22 +179,30 @@ void GrpcStream::FinishAndNotify(const Status& status) {
173179}
174180
175181void GrpcStream::Shutdown () {
182+ LOG_DEBUG (" GrpcStream('%s'): shutting down; completions: %s, is finished: %s" ,
183+ this , completions_.size (), is_grpc_call_finished_);
184+
176185 MaybeUnregister ();
177- if (completions_.empty ()) {
178- // Nothing to cancel -- either the call was already finished, or it has
179- // never been started.
180- return ;
186+
187+ // If completions are empty but the call hasn't been finished, it means this
188+ // stream has never started. Calling `Finish` on the underlying gRPC call is
189+ // invalid if it wasn't started previously.
190+ if (!completions_.empty () && !is_grpc_call_finished_) {
191+ // Important: during normal operation, the stream always has a pending read
192+ // operation, so `Shutdown` would hang indefinitely if we didn't cancel the
193+ // `context_`. However, if the stream has already failed, avoid canceling
194+ // the context to avoid overwriting the status captured during the
195+ // `OnOperationFailed`.
196+
197+ context_->TryCancel ();
198+ FinishGrpcCall ({});
181199 }
182200
183- // Important: since the stream always has a pending read operation,
184- // cancellation has to be called, or else the read would hang forever, and
185- // finish operation will never get completed.
186- // (on the other hand, when an operation fails, cancellation should not be
187- // called, otherwise the real failure cause will be overwritten by status
188- // "canceled".)
189- context_->TryCancel ();
190- FinishCall ({});
191- // Wait until "finish" is off the queue.
201+ // Drain the completions -- `Shutdown` guarantees to bring the stream into
202+ // destructible state. Two possibilities here:
203+ // - completions are empty -- nothing to block on;
204+ // - the only completion is "finish" (whether enqueued by this function or
205+ // previously) -- "finish" is a very fast operation.
192206 FastFinishCompletionsBlocking ();
193207}
194208
@@ -199,7 +213,12 @@ void GrpcStream::MaybeUnregister() {
199213 }
200214}
201215
202- void GrpcStream::FinishCall (const OnSuccess& callback) {
216+ void GrpcStream::FinishGrpcCall (const OnSuccess& callback) {
217+ LOG_DEBUG (" GrpcStream('%s'): finishing the underlying call" , this );
218+
219+ HARD_ASSERT (!is_grpc_call_finished_, " FinishGrpcCall called twice" );
220+ is_grpc_call_finished_ = true ;
221+
203222 // All completions issued by this call must be taken off the queue before
204223 // finish operation can be enqueued.
205224 FastFinishCompletionsBlocking ();
@@ -208,6 +227,9 @@ void GrpcStream::FinishCall(const OnSuccess& callback) {
208227}
209228
210229void GrpcStream::FastFinishCompletionsBlocking () {
230+ LOG_DEBUG (" GrpcStream('%s'): fast finishing %s completion(s)" , this ,
231+ completions_.size ());
232+
211233 // TODO(varconst): reset buffered_writer_? Should not be necessary, because it
212234 // should never be called again after a call to Finish.
213235
@@ -226,30 +248,35 @@ void GrpcStream::FastFinishCompletionsBlocking() {
226248
227249bool GrpcStream::WriteAndFinish (grpc::ByteBuffer&& message) {
228250 bool did_last_write = false ;
251+ if (!is_grpc_call_finished_) {
252+ did_last_write = TryLastWrite (std::move (message));
253+ }
254+ FinishImmediately ();
255+ return did_last_write;
256+ }
229257
258+ bool GrpcStream::TryLastWrite (grpc::ByteBuffer&& message) {
230259 absl::optional<BufferedWrite> maybe_write =
231260 buffered_writer_.EnqueueWrite (std::move (message));
232261 // Only bother with the last write if there is no active write at the moment.
233- if (maybe_write) {
234- BufferedWrite last_write = std::move (maybe_write).value ();
235- GrpcCompletion* completion = NewCompletion (Type::Write, {});
236- *completion->message () = last_write.message ;
237- call_->WriteLast (*completion->message (), grpc::WriteOptions{}, completion);
238-
239- // Empirically, the write normally takes less than a millisecond to finish
240- // (both with and without network connection), and never more than several
241- // dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
242- // there happen to be circumstances under which the write may block
243- // indefinitely (in that case, rely on the fact that canceling a gRPC call
244- // makes all pending operations come back from the queue quickly).
245- auto status = completion->WaitUntilOffQueue (std::chrono::milliseconds (500 ));
246- if (status == std::future_status::ready) {
247- did_last_write = true ;
248- }
262+ if (!maybe_write) {
263+ return false ;
249264 }
250265
251- FinishImmediately ();
252- return did_last_write;
266+ BufferedWrite last_write = std::move (maybe_write).value ();
267+ GrpcCompletion* completion = NewCompletion (Type::Write, {});
268+ *completion->message () = last_write.message ;
269+ call_->WriteLast (*completion->message (), grpc::WriteOptions{}, completion);
270+
271+ // Empirically, the write normally takes less than a millisecond to finish
272+ // (both with and without network connection), and never more than several
273+ // dozen milliseconds. Nevertheless, ensure `WriteAndFinish` doesn't hang if
274+ // there happen to be circumstances under which the write may block
275+ // indefinitely (in that case, rely on the fact that canceling a gRPC call
276+ // makes all pending operations come back from the queue quickly).
277+
278+ auto status = completion->WaitUntilOffQueue (std::chrono::milliseconds (500 ));
279+ return status == std::future_status::ready;
253280}
254281
255282GrpcStream::Metadata GrpcStream::GetResponseHeaders () const {
@@ -277,7 +304,13 @@ void GrpcStream::OnWrite() {
277304}
278305
279306void GrpcStream::OnOperationFailed () {
280- FinishCall ([this ](const GrpcCompletion* completion) {
307+ if (is_grpc_call_finished_) {
308+ // If a finish operation has been enqueued already (possibly by a previous
309+ // failed operation), there's nothing to do.
310+ return ;
311+ }
312+
313+ FinishGrpcCall ([this ](const GrpcCompletion* completion) {
281314 Status status = ConvertStatus (*completion->status ());
282315 FinishAndNotify (status);
283316 });
@@ -303,6 +336,8 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
303336 } else {
304337 // Use the same error-handling for all operations; all errors are
305338 // unrecoverable.
339+ LOG_DEBUG (" GrpcStream('%s'): operation of type %s failed" , this ,
340+ completion->type ());
306341 OnOperationFailed ();
307342 }
308343 };
0 commit comments