Skip to content

Commit e68ce08

Browse files
committed
quic: Fix race condition in Stream::Outbound::Pull
1 parent bde8c32 commit e68ce08

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

src/quic/streams.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -523,8 +523,12 @@ class Stream::Outbound final : public MemoryRetainer {
523523
// that the pull is sync but allow for it to be async.
524524
int ret = reader_->Pull(
525525
[this](auto status, auto vecs, auto count, auto done) {
526-
// Always make sure next_pending_ is false when we're done.
527-
auto on_exit = OnScopeLeave([this] { next_pending_ = false; });
526+
const bool last_next_pending_state = next_pending_;
527+
next_pending_ = false;
528+
// this ensures that next_pending is reset to false
529+
// Note that for example ResumeStream below, may again call Pull
530+
// so we have to erase next_pending_ to not block this call.
531+
// Therefore the previous OnScopeLeave lead to a race condition.
528532

529533
// We need to hold a reference to stream and session
530534
// so that it can not go away during the next calls.
@@ -536,10 +540,10 @@ class Stream::Outbound final : public MemoryRetainer {
536540
DCHECK_NE(status, bob::Status::STATUS_WAIT);
537541

538542
if (status < 0) {
539-
// If next_pending_ is true then a pull from the reader ended up
543+
// If next_pending_ was true then a pull from the reader ended up
540544
// being asynchronous, our stream is blocking waiting for the data,
541545
// but we have an error! oh no! We need to error the stream.
542-
if (next_pending_) {
546+
if (last_next_pending_state) {
543547
stream->Destroy(
544548
QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR));
545549
// We do not need to worry about calling MarkErrored in this case
@@ -558,7 +562,8 @@ class Stream::Outbound final : public MemoryRetainer {
558562
// Here, there is no more data to read, but we will might have data
559563
// in the uncommitted queue. We'll resume the stream so that the
560564
// session will try to read from it again.
561-
if (next_pending_) {
565+
if (last_next_pending_state) {
566+
fprintf(stderr, "next_pending ResumeStream EOS\n");
562567
session->ResumeStream(stream_->id());
563568
}
564569
return;
@@ -578,11 +583,11 @@ class Stream::Outbound final : public MemoryRetainer {
578583
// bytes in the queue.
579584
Append(vecs, count, std::move(done));
580585

581-
// If next_pending_ is true, then a pull from the reader ended up
586+
// If next_pending_ was true, then a pull from the reader ended up
582587
// being asynchronous, our stream is blocking waiting for the data.
583588
// Now that we have data, let's resume the stream so the session will
584589
// pull from it again.
585-
if (next_pending_) {
590+
if (last_next_pending_state) {
586591
stream->session().ResumeStream(stream_->id());
587592
}
588593
},

0 commit comments

Comments
 (0)