@@ -56,7 +56,7 @@ void ObjectDescriptorImpl::Start(
5656 OnRead (it, std::move (first_response));
5757 // Acquire lock and queue the background stream.
5858 lk.lock ();
59- AssurePendingStreamQueued ();
59+ AssurePendingStreamQueued (lk );
6060}
6161
6262void ObjectDescriptorImpl::Cancel () {
@@ -72,7 +72,8 @@ absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata()
7272 return metadata_;
7373}
7474
75- void ObjectDescriptorImpl::AssurePendingStreamQueued () {
75+ void ObjectDescriptorImpl::AssurePendingStreamQueued (
76+ std::unique_lock<std::mutex> const &) {
7677 if (pending_stream_.valid ()) return ;
7778 auto request = google::storage::v2::BidiReadObjectRequest{};
7879
@@ -92,7 +93,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
9293 return ;
9394 }
9495 // Proactively create a new stream if needed.
95- AssurePendingStreamQueued ();
96+ AssurePendingStreamQueued (lk );
9697 if (!pending_stream_.valid ()) return ;
9798 auto stream_future = std::move (pending_stream_);
9899 lk.unlock ();
@@ -119,7 +120,7 @@ void ObjectDescriptorImpl::MakeSubsequentStream() {
119120 auto new_it = self->stream_manager_ ->AddStream (std::move (read_stream));
120121
121122 // Now that we consumed pending_stream_, queue the next one immediately.
122- self->AssurePendingStreamQueued ();
123+ self->AssurePendingStreamQueued (lk );
123124
124125 lk.unlock ();
125126 self->OnRead (new_it, std::move (stream_result->first_response ));
@@ -263,7 +264,7 @@ void ObjectDescriptorImpl::OnFinish(StreamIterator it, Status const& status) {
263264 std::unique_lock<std::mutex> lk (mu_);
264265 stream_manager_->RemoveStreamAndNotifyRanges (it, status);
265266 // Since a stream died, we might want to ensure a replacement is queued.
266- AssurePendingStreamQueued ();
267+ AssurePendingStreamQueued (lk );
267268}
268269
269270void ObjectDescriptorImpl::Resume (StreamIterator it,
0 commit comments