diff --git a/src/workerd/api/streams/queue.c++ b/src/workerd/api/streams/queue.c++ index 48aa69a8018..ab651d7c90c 100644 --- a/src/workerd/api/streams/queue.c++ +++ b/src/workerd/api/streams/queue.c++ @@ -159,8 +159,9 @@ void ValueQueue::handlePush( // Otherwise, pop the next pending read and resolve it. There should be nothing in the queue. KJ_REQUIRE(state.buffer.empty() && state.queueTotalSize == 0); - state.readRequests.front()->resolve(js, entry->getValue(js)); + auto request = kj::mv(state.readRequests.front()); state.readRequests.pop_front(); + request->resolve(js, entry->getValue(js)); } void ValueQueue::handleRead(jsg::Lock& js, @@ -707,8 +708,9 @@ void ByteQueue::handlePush( // We do not need to adjust the pullInto.atLeast here since we are immediately // fulfilling the read at this point. - pending.resolve(js); + auto request = kj::mv(state.readRequests.front()); state.readRequests.pop_front(); + request->resolve(js); } // If the entry was consumed completely by the pending read, then we're done! @@ -895,8 +897,9 @@ bool ByteQueue::handleMaybeClose( // Technically, we really shouldn't get here but the case is covered // just in case. KJ_ASSERT(state.queueTotalSize == 0); - pending.resolve(js); + auto request = kj::mv(state.readRequests.front()); state.readRequests.pop_front(); + request->resolve(js); return true; } KJ_CASE_ONEOF(entry, QueueEntry) { @@ -945,8 +948,9 @@ bool ByteQueue::handleMaybeClose( // state.queueTotalSize happens to be zero, we can safely indicate that we // have read the remaining data as this may have been the last actual value // entry in the buffer. - pending.resolve(js); + auto request = kj::mv(state.readRequests.front()); state.readRequests.pop_front(); + request->resolve(js); if (state.queueTotalSize == 0) { // If the queueTotalSize is zero at this point, the next item in the queue @@ -979,8 +983,9 @@ bool ByteQueue::handleMaybeClose( // buffer. KJ_ASSERT(state.queueTotalSize > 0); - pending.resolve(js); + auto request = kj::mv(state.readRequests.front()); state.readRequests.pop_front(); + request->resolve(js); return false; } } diff --git a/src/workerd/api/streams/queue.h b/src/workerd/api/streams/queue.h index a5abbbd5ab5..e7ab67e4d1c 100644 --- a/src/workerd/api/streams/queue.h +++ b/src/workerd/api/streams/queue.h @@ -433,16 +433,20 @@ class ConsumerImpl final { auto& ready = state.requireActiveUnsafe(); KJ_REQUIRE(!ready.readRequests.empty()); KJ_REQUIRE(&req == ready.readRequests.front().get()); - req.resolve(js); + // Pop the request before resolving to ensure the request is fully owned locally. + auto request = kj::mv(ready.readRequests.front()); ready.readRequests.pop_front(); + request->resolve(js); } void resolveReadAsDone(jsg::Lock& js, ReadRequest& req) { auto& ready = state.requireActiveUnsafe(); KJ_REQUIRE(!ready.readRequests.empty()); KJ_REQUIRE(&req == ready.readRequests.front().get()); - req.resolveAsDone(js); + // Pop the request before resolving to ensure the request is fully owned locally. + auto request = kj::mv(ready.readRequests.front()); ready.readRequests.pop_front(); + request->resolveAsDone(js); } void cloneTo(jsg::Lock& js, ConsumerImpl& other) {