diff --git a/src/workerd/api/streams/compression.c++ b/src/workerd/api/streams/compression.c++ index b4ac31cab26..2edd63ded4e 100644 --- a/src/workerd/api/streams/compression.c++ +++ b/src/workerd/api/streams/compression.c++ @@ -326,7 +326,6 @@ class CompressionStreamBase: public kj::Refcounted, canceler.cancel(kj::cp(reason)); transitionToErrored(kj::mv(reason)); - //state = kj::mv(reason); } kj::Promise tryReadInternal(kj::ArrayPtr dest, size_t minBytes) { diff --git a/src/workerd/api/streams/readable-source-adapter.c++ b/src/workerd/api/streams/readable-source-adapter.c++ index 36acfb9d2c0..18a0d0b9467 100644 --- a/src/workerd/api/streams/readable-source-adapter.c++ +++ b/src/workerd/api/streams/readable-source-adapter.c++ @@ -177,108 +177,104 @@ jsg::Promise ReadableStreamSourceJsAd }); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - // Deference the IoOwn once to get the active state. - Active& active = *open.active; + auto& open = state.requireActiveUnsafe(); + // Deference the IoOwn once to get the active state. + Active& active = *open.active; + + // If close is pending, we cannot accept any more reads. + // Treat them as if the stream is closed. + if (active.closePending) { + return js.resolvedPromise(ReadResult{ + .buffer = transferToEmptyBuffer(js, kj::mv(options.buffer)), + .done = true, + }); + } + + // Ok, we are in a readable state, there are no pending closes. + // Let's enqueue our read request. + auto& ioContext = IoContext::current(); - // If close is pending, we cannot accept any more reads. - // Treat them as if the stream is closed. - if (active.closePending) { + auto buffer = kj::mv(options.buffer); + auto elementSize = buffer.getElementSize(); + + // The buffer size should always be a multiple of the element size and should + // always be at least as large as minBytes. This should be handled for us by + // the jsg::BufferSource, but just to be safe, we will double-check with a + // debug assert here. + KJ_DASSERT(buffer.size() % elementSize == 0); + + auto minBytes = kj::min(options.minBytes.orDefault(elementSize), buffer.size()); + // We want to be sure that minBytes is a multiple of the element size + // of the buffer, otherwise we might never be able to satisfy the request + // correcty. If the caller provided a minBytes, and it is not a multiple + // of the element size, we will round it up to the next multiple. + if (elementSize > 1) { + minBytes = minBytes + (elementSize - (minBytes % elementSize)) % elementSize; + } + + // Note: We do not enforce that the source must provide at least minBytes + // if available here as that is part of the contract of the source itself. + // We will simply pass minBytes along to the source and it is up to the + // source to honor it. We do, however, enforce that the source must + // never return more than the size of the buffer we provided. + + // We only pass a kj::ArrayPtr to the buffer into the read call, keeping + // the actual buffer instance alive by attaching it to the JS promise + // chain that follows the read in order to keep it alive. + auto promise = active.enqueue(kj::coCapture( + [&active, buffer = buffer.asArrayPtr(), minBytes]() mutable -> kj::Promise { + // TODO(soon): The underlying kj streams API now supports passing the + // kj::ArrayPtr directly to the read call, but ReadableStreamSource has + // not yet been updated to do so. When it is, we can update this read to + // pass `buffer` directly rather than passing the begin() and size(). + co_return co_await active.source->read(buffer, minBytes); + })); + return ioContext + .awaitIo(js, kj::mv(promise), + [buffer = kj::mv(buffer), self = selfRef.addRef()](jsg::Lock& js, + size_t bytesRead) mutable -> jsg::Promise { + // If the bytesRead is 0, that indicates the stream is closed. We will + // move the stream to a closed state and return the empty buffer. + if (bytesRead == 0) { + self->runIfAlive([](ReadableStreamSourceJsAdapter& self) { + KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { + open.active->closePending = true; + } + }); return js.resolvedPromise(ReadResult{ - .buffer = transferToEmptyBuffer(js, kj::mv(options.buffer)), + .buffer = transferToEmptyBuffer(js, kj::mv(buffer)), .done = true, }); } - - // Ok, we are in a readable state, there are no pending closes. - // Let's enqueue our read request. - auto& ioContext = IoContext::current(); - - auto buffer = kj::mv(options.buffer); - auto elementSize = buffer.getElementSize(); - - // The buffer size should always be a multiple of the element size and should - // always be at least as large as minBytes. This should be handled for us by - // the jsg::BufferSource, but just to be safe, we will double-check with a - // debug assert here. - KJ_DASSERT(buffer.size() % elementSize == 0); - - auto minBytes = kj::min(options.minBytes.orDefault(elementSize), buffer.size()); - // We want to be sure that minBytes is a multiple of the element size - // of the buffer, otherwise we might never be able to satisfy the request - // correcty. If the caller provided a minBytes, and it is not a multiple - // of the element size, we will round it up to the next multiple. - if (elementSize > 1) { - minBytes = minBytes + (elementSize - (minBytes % elementSize)) % elementSize; + KJ_DASSERT(bytesRead <= buffer.size()); + + // If bytesRead is not a multiple of the element size, that indicates + // that the source either read less than minBytes (and ended), or is + // simply unable to satisfy the element size requirement. We cannot + // provide a partial element to the caller, so reject the read. + if (bytesRead % buffer.getElementSize() != 0) { + return js.rejectedPromise( + js.typeError(kj::str("The underlying stream failed to provide a multiple of the " + "target element size ", + buffer.getElementSize()))); } - // Note: We do not enforce that the source must provide at least minBytes - // if available here as that is part of the contract of the source itself. - // We will simply pass minBytes along to the source and it is up to the - // source to honor it. We do, however, enforce that the source must - // never return more than the size of the buffer we provided. - - // We only pass a kj::ArrayPtr to the buffer into the read call, keeping - // the actual buffer instance alive by attaching it to the JS promise - // chain that follows the read in order to keep it alive. - auto promise = active.enqueue(kj::coCapture( - [&active, buffer = buffer.asArrayPtr(), minBytes]() mutable -> kj::Promise { - // TODO(soon): The underlying kj streams API now supports passing the - // kj::ArrayPtr directly to the read call, but ReadableStreamSource has - // not yet been updated to do so. When it is, we can update this read to - // pass `buffer` directly rather than passing the begin() and size(). - co_return co_await active.source->read(buffer, minBytes); - })); - return ioContext - .awaitIo(js, kj::mv(promise), - [buffer = kj::mv(buffer), self = selfRef.addRef()]( - jsg::Lock& js, size_t bytesRead) mutable - -> jsg::Promise { - // If the bytesRead is 0, that indicates the stream is closed. We will - // move the stream to a closed state and return the empty buffer. - if (bytesRead == 0) { - self->runIfAlive([](ReadableStreamSourceJsAdapter& self) { - KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { - open.active->closePending = true; - } - }); - return js.resolvedPromise(ReadResult{ - .buffer = transferToEmptyBuffer(js, kj::mv(buffer)), - .done = true, - }); - } - KJ_DASSERT(bytesRead <= buffer.size()); - - // If bytesRead is not a multiple of the element size, that indicates - // that the source either read less than minBytes (and ended), or is - // simply unable to satisfy the element size requirement. We cannot - // provide a partial element to the caller, so reject the read. - if (bytesRead % buffer.getElementSize() != 0) { - return js.rejectedPromise( - js.typeError(kj::str("The underlying stream failed to provide a multiple of the " - "target element size ", - buffer.getElementSize()))); - } - - auto backing = buffer.detach(js); - backing.limit(bytesRead); - return js.resolvedPromise(ReadResult{ - .buffer = jsg::BufferSource(js, kj::mv(backing)), - .done = false, - }); - }) - .catch_(js, - [self = selfRef.addRef()]( - jsg::Lock& js, jsg::Value exception) -> ReadableStreamSourceJsAdapter::ReadResult { - // If an error occurred while reading, we need to transition the adapter - // to the canceled state, but only if the adapter is still alive. - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); - js.throwException(kj::mv(exception)); + auto backing = buffer.detach(js); + backing.limit(bytesRead); + return js.resolvedPromise(ReadResult{ + .buffer = jsg::BufferSource(js, kj::mv(backing)), + .done = false, }); - } - - KJ_UNREACHABLE; + }) + .catch_(js, + [self = selfRef.addRef()]( + jsg::Lock& js, jsg::Value exception) -> ReadableStreamSourceJsAdapter::ReadResult { + // If an error occurred while reading, we need to transition the adapter + // to the canceled state, but only if the adapter is still alive. + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); + js.throwException(kj::mv(exception)); + }); } // Transitions the adapter into the closing state. Once the read queue @@ -297,31 +293,28 @@ jsg::Promise ReadableStreamSourceJsAdapter::close(jsg::Lock& js) { return js.resolvedPromise(); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& ioContext = IoContext::current(); - auto& active = *open.active; - - if (active.closePending) { - return js.rejectedPromise(js.typeError("Close already pending, cannot close again.")); - } + auto& open = state.requireActiveUnsafe(); + auto& ioContext = IoContext::current(); + auto& active = *open.active; - active.closePending = true; - auto promise = active.enqueue([]() -> kj::Promise { co_return 0; }); - - return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock&, size_t) { - self->runIfAlive( - [](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) { - // Likewise, while nothing should be waiting on the ready promise, we - // should still reject it just in case. - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); - js.throwException(kj::mv(exception)); - }); + if (active.closePending) { + return js.rejectedPromise(js.typeError("Close already pending, cannot close again.")); } - KJ_UNREACHABLE; + active.closePending = true; + auto promise = active.enqueue([]() -> kj::Promise { co_return 0; }); + + return ioContext + .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock&, size_t) { + self->runIfAlive( + [](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); + }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) { + // Likewise, while nothing should be waiting on the ready promise, we + // should still reject it just in case. + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); + js.throwException(kj::mv(exception)); + }); } jsg::Promise> ReadableStreamSourceJsAdapter::readAllText( @@ -339,52 +332,49 @@ jsg::Promise> ReadableStreamSourceJsAdapter::readAllTe return js.resolvedPromise(jsg::JsRef(js, js.str())); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& ioContext = IoContext::current(); - auto& active = *open.active; - - if (active.closePending) { - return js.rejectedPromise>( - js.typeError("Close already pending, cannot read.")); - } - active.closePending = true; - - struct Holder { - kj::Maybe result; - }; - auto holder = kj::heap(); - - auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { - auto str = co_await active.source->readAllText(limit); - size_t amount = str.size(); - holder.result = kj::mv(str); - co_return amount; - }); + auto& open = state.requireActiveUnsafe(); + auto& ioContext = IoContext::current(); + auto& active = *open.active; - return ioContext - .awaitIo(js, kj::mv(promise), - [self = selfRef.addRef(), holder = kj::mv(holder)](jsg::Lock& js, size_t amount) { - self->runIfAlive( - [&](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); - KJ_IF_SOME(result, holder->result) { - KJ_DASSERT(result.size() == amount); - return jsg::JsRef(js, js.str(result)); - } else { - return jsg::JsRef(js, js.str()); - } - }) - .catch_(js, - [self = selfRef.addRef()]( - jsg::Lock& js, jsg::Value&& exception) -> jsg::JsRef { - // Likewise, while nothing should be waiting on the ready promise, we - // should still reject it just in case. - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); - js.throwException(kj::mv(exception)); - }); + if (active.closePending) { + return js.rejectedPromise>( + js.typeError("Close already pending, cannot read.")); } + active.closePending = true; - KJ_UNREACHABLE; + struct Holder { + kj::Maybe result; + }; + auto holder = kj::heap(); + + auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { + auto str = co_await active.source->readAllText(limit); + size_t amount = str.size(); + holder.result = kj::mv(str); + co_return amount; + }); + + return ioContext + .awaitIo(js, kj::mv(promise), + [self = selfRef.addRef(), holder = kj::mv(holder)](jsg::Lock& js, size_t amount) { + self->runIfAlive( + [&](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); + KJ_IF_SOME(result, holder->result) { + KJ_DASSERT(result.size() == amount); + return jsg::JsRef(js, js.str(result)); + } else { + return jsg::JsRef(js, js.str()); + } + }) + .catch_(js, + [self = selfRef.addRef()]( + jsg::Lock& js, jsg::Value&& exception) -> jsg::JsRef { + // Likewise, while nothing should be waiting on the ready promise, we + // should still reject it just in case. + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); + js.throwException(kj::mv(exception)); + }); } jsg::Promise ReadableStreamSourceJsAdapter::readAllBytes( @@ -403,56 +393,53 @@ jsg::Promise ReadableStreamSourceJsAdapter::readAllBytes( return js.resolvedPromise(jsg::BufferSource(js, kj::mv(backing))); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& ioContext = IoContext::current(); - auto& active = *open.active; - - if (active.closePending) { - return js.rejectedPromise( - js.typeError("Close already pending, cannot read.")); - } - active.closePending = true; - - struct Holder { - kj::Maybe> result; - }; - auto holder = kj::heap(); - - auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { - auto str = co_await active.source->readAllBytes(limit); - size_t amount = str.size(); - holder.result = kj::mv(str); - co_return amount; - }); + auto& open = state.requireActiveUnsafe(); + auto& ioContext = IoContext::current(); + auto& active = *open.active; - return ioContext - .awaitIo(js, kj::mv(promise), - [self = selfRef.addRef(), holder = kj::mv(holder)](jsg::Lock& js, size_t amount) { - self->runIfAlive( - [&](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); - KJ_IF_SOME(result, holder->result) { - KJ_DASSERT(result.size() == amount); - // We have to copy the data into the backing store because of the - // v8 sandboxing rules. - auto backing = jsg::BackingStore::alloc(js, amount); - backing.asArrayPtr().copyFrom(result); - return jsg::BufferSource(js, kj::mv(backing)); - } else { - auto backing = jsg::BackingStore::alloc(js, 0); - return jsg::BufferSource(js, kj::mv(backing)); - } - }) - .catch_(js, - [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) -> jsg::BufferSource { - // Likewise, while nothing should be waiting on the ready promise, we - // should still reject it just in case. - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); - js.throwException(kj::mv(exception)); - }); + if (active.closePending) { + return js.rejectedPromise( + js.typeError("Close already pending, cannot read.")); } + active.closePending = true; - KJ_UNREACHABLE; + struct Holder { + kj::Maybe> result; + }; + auto holder = kj::heap(); + + auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { + auto str = co_await active.source->readAllBytes(limit); + size_t amount = str.size(); + holder.result = kj::mv(str); + co_return amount; + }); + + return ioContext + .awaitIo(js, kj::mv(promise), + [self = selfRef.addRef(), holder = kj::mv(holder)](jsg::Lock& js, size_t amount) { + self->runIfAlive( + [&](ReadableStreamSourceJsAdapter& self) { self.state.transitionTo(); }); + KJ_IF_SOME(result, holder->result) { + KJ_DASSERT(result.size() == amount); + // We have to copy the data into the backing store because of the + // v8 sandboxing rules. + auto backing = jsg::BackingStore::alloc(js, amount); + backing.asArrayPtr().copyFrom(result); + return jsg::BufferSource(js, kj::mv(backing)); + } else { + auto backing = jsg::BackingStore::alloc(js, 0); + return jsg::BufferSource(js, kj::mv(backing)); + } + }) + .catch_(js, + [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) -> jsg::BufferSource { + // Likewise, while nothing should be waiting on the ready promise, we + // should still reject it just in case. + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](ReadableStreamSourceJsAdapter& self) { self.cancel(js, error); }); + js.throwException(kj::mv(exception)); + }); } kj::Maybe ReadableStreamSourceJsAdapter::tryGetLength(StreamEncoding encoding) { @@ -473,21 +460,18 @@ kj::Maybe ReadableStreamSourceJsAdapter::try return kj::none; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - // If we are closing, or have pending tasks, we cannot tee. - JSG_REQUIRE(!active.closePending && !active.running && active.queue.empty(), Error, - "Cannot tee a stream that is closing or has pending reads."); - auto tee = active.source->tee(limit); - auto& ioContext = IoContext::current(); - state.transitionTo(); - return Tee{ - .branch1 = kj::heap(js, ioContext, kj::mv(tee.branch1)), - .branch2 = kj::heap(js, ioContext, kj::mv(tee.branch2)), - }; - } - - KJ_UNREACHABLE; + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + // If we are closing, or have pending tasks, we cannot tee. + JSG_REQUIRE(!active.closePending && !active.running && active.queue.empty(), Error, + "Cannot tee a stream that is closing or has pending reads."); + auto tee = active.source->tee(limit); + auto& ioContext = IoContext::current(); + state.transitionTo(); + return Tee{ + .branch1 = kj::heap(js, ioContext, kj::mv(tee.branch1)), + .branch2 = kj::heap(js, ioContext, kj::mv(tee.branch2)), + }; } // =============================================================================================== @@ -1010,40 +994,38 @@ kj::Promise ReadableSourceKjAdapter::read(kj::ArrayPtr buffer, return static_cast(0); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - KJ_SWITCH_ONEOF(active.state) { - KJ_CASE_ONEOF(_, Active::Reading) { - KJ_FAIL_REQUIRE("Cannot have multiple concurrent reads."); - } - KJ_CASE_ONEOF(_, Active::Done) { - // The previous read indicated that it was the last read by returning - // less than the minimum bytes requested. We have to treat this as - // the stream being closed. - state.transitionTo(); - return static_cast(0); - } - KJ_CASE_ONEOF(canceling, Active::Canceling) { - // The stream is being canceled. Propagate the exception and complete - // the state transition. - return KJ_ASSERT_NONNULL(checkCancelingOrCanceled(active)); - } - KJ_CASE_ONEOF(canceled, Active::Canceled) { - // The stream was canceled. Propagate the exception and complete - // the state transition. - return KJ_ASSERT_NONNULL(checkCancelingOrCanceled(active)); - } - KJ_CASE_ONEOF(r, Active::Readable) { - // There is some data left over from a previous read. - return readImpl(active, buffer, minBytes); - } - KJ_CASE_ONEOF(_, Active::Idle) { - // There are no pending reads and no left over data. - return readImpl(active, buffer, minBytes); - } + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + KJ_SWITCH_ONEOF(active.state) { + KJ_CASE_ONEOF(_, Active::Reading) { + KJ_FAIL_REQUIRE("Cannot have multiple concurrent reads."); + } + KJ_CASE_ONEOF(_, Active::Done) { + // The previous read indicated that it was the last read by returning + // less than the minimum bytes requested. We have to treat this as + // the stream being closed. + state.transitionTo(); + return static_cast(0); + } + KJ_CASE_ONEOF(canceling, Active::Canceling) { + // The stream is being canceled. Propagate the exception and complete + // the state transition. + return KJ_ASSERT_NONNULL(checkCancelingOrCanceled(active)); + } + KJ_CASE_ONEOF(canceled, Active::Canceled) { + // The stream was canceled. Propagate the exception and complete + // the state transition. + return KJ_ASSERT_NONNULL(checkCancelingOrCanceled(active)); + } + KJ_CASE_ONEOF(r, Active::Readable) { + // There is some data left over from a previous read. + return readImpl(active, buffer, minBytes); + } + KJ_CASE_ONEOF(_, Active::Idle) { + // There are no pending reads and no left over data. + return readImpl(active, buffer, minBytes); } } - KJ_UNREACHABLE; } @@ -1365,42 +1347,39 @@ kj::Promise> ReadableSourceKjAdapter::pumpTo( return newNoopDeferredProxy(); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - // Per the contract for ReadableStreamSource::pumpTo, the pump operation - // will take over ownership of the underlying stream until it is complete, - // leaving the adapter itself in a closed state once the pump starts. - // Dropping the returned promise will cancel the pump operation. - // We do, however, need to first make sure that our active state is - // not already pending a read or terminal state change. - KJ_REQUIRE(!active.state.is(), "Cannot have multiple concurrent reads."); - - if (active.state.is()) { - // The previous read indicated that it was the last read by returning - // less than the minimum bytes requested, or the stream was fully - // canceled. We have to treat this as the stream being closed. - state.transitionTo(); - return newNoopDeferredProxy(); - } - - KJ_IF_SOME(exception, checkCancelingOrCanceled(active)) { - return kj::Promise>(kj::mv(exception)); - } + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + // Per the contract for ReadableStreamSource::pumpTo, the pump operation + // will take over ownership of the underlying stream until it is complete, + // leaving the adapter itself in a closed state once the pump starts. + // Dropping the returned promise will cancel the pump operation. + // We do, however, need to first make sure that our active state is + // not already pending a read or terminal state change. + KJ_REQUIRE(!active.state.is(), "Cannot have multiple concurrent reads."); + + if (active.state.is()) { + // The previous read indicated that it was the last read by returning + // less than the minimum bytes requested, or the stream was fully + // canceled. We have to treat this as the stream being closed. + state.transitionTo(); + return newNoopDeferredProxy(); + } - // The active state should be Readable of Idle here. Let's verify. - KJ_DASSERT(active.state.is() || active.state.is()); + KJ_IF_SOME(exception, checkCancelingOrCanceled(active)) { + return kj::Promise>(kj::mv(exception)); + } - // The Active state will be transferred into the pumpImpl operation. - auto activeState = kj::mv(open.active); - state.transitionTo(); // transition to closed immediately + // The active state should be Readable of Idle here. Let's verify. + KJ_DASSERT(active.state.is() || active.state.is()); - // Because pumpToImpl is wrapping a JavaScript stream, it is not eligible - // for deferred proxying. We will return a noopDeferredProxy that wraps the - // promise from pumpToImpl(); - return addNoopDeferredProxy(pumpToImpl(kj::mv(activeState), output, end)); - } + // The Active state will be transferred into the pumpImpl operation. + auto activeState = kj::mv(open.active); + state.transitionTo(); // transition to closed immediately - KJ_UNREACHABLE; + // Because pumpToImpl is wrapping a JavaScript stream, it is not eligible + // for deferred proxying. We will return a noopDeferredProxy that wraps the + // promise from pumpToImpl(); + return addNoopDeferredProxy(pumpToImpl(kj::mv(activeState), output, end)); } ReadableSource::Tee ReadableSourceKjAdapter::tee(size_t) { @@ -1436,54 +1415,51 @@ kj::Promise> ReadableSourceKjAdapter::readAllImpl(size_t limit) { co_return kj::Array(); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - KJ_REQUIRE(!active.state.is(), "Cannot have multiple concurrent reads."); + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + KJ_REQUIRE(!active.state.is(), "Cannot have multiple concurrent reads."); - if (active.state.is()) { - // The previous read indicated that it was the last read by returning - // less than the minimum bytes requested. We have to treat this as - // the stream being closed. - state.transitionTo(); - co_return kj::Array(); - } + if (active.state.is()) { + // The previous read indicated that it was the last read by returning + // less than the minimum bytes requested. We have to treat this as + // the stream being closed. + state.transitionTo(); + co_return kj::Array(); + } - throwIfCancelingOrCanceled(active); - - // Our readAll operation will accumulate data into a buffer up to the - // specified limit. If the limit is exceeded, the returned promise will - // be rejected. Once the readAll operation starts, the adapter is moved - // into a closed state and ownership of the underlying stream is transferred - // to the readAll promise. - auto activeState = kj::mv(open.active); - state.transitionTo(); // transition to closed immediately - - KJ_DASSERT(activeState->state.is() || activeState->state.is()); - - // We do not use the canceler here. The adapter is closed and can be safely dropped. - // This promise, however, will keep the stream alive until the read is completed. - // If the returned promise is dropped, the readAll operation will be canceled. - CancelationToken cancelationToken; - co_return co_await IoContext::current().run( - [limit, active = kj::mv(activeState), cancelationToken = cancelationToken.getWeakRef()]( - jsg::Lock& js) mutable -> kj::Promise> { - kj::Vector accumulated; - // If we know the length of the stream ahead of time, and it is within the limit, - // we can reserve that much space in the accumulator to avoid multiple allocations. - KJ_IF_SOME(length, active->stream->tryGetLength(StreamEncoding::IDENTITY)) { - if (length <= limit) { - accumulated.reserve(length); // Pre-allocate - } + throwIfCancelingOrCanceled(active); + + // Our readAll operation will accumulate data into a buffer up to the + // specified limit. If the limit is exceeded, the returned promise will + // be rejected. Once the readAll operation starts, the adapter is moved + // into a closed state and ownership of the underlying stream is transferred + // to the readAll promise. + auto activeState = kj::mv(open.active); + state.transitionTo(); // transition to closed immediately + + KJ_DASSERT(activeState->state.is() || activeState->state.is()); + + // We do not use the canceler here. The adapter is closed and can be safely dropped. + // This promise, however, will keep the stream alive until the read is completed. + // If the returned promise is dropped, the readAll operation will be canceled. + CancelationToken cancelationToken; + co_return co_await IoContext::current().run( + [limit, active = kj::mv(activeState), cancelationToken = cancelationToken.getWeakRef()]( + jsg::Lock& js) mutable -> kj::Promise> { + kj::Vector accumulated; + // If we know the length of the stream ahead of time, and it is within the limit, + // we can reserve that much space in the accumulator to avoid multiple allocations. + KJ_IF_SOME(length, active->stream->tryGetLength(StreamEncoding::IDENTITY)) { + if (length <= limit) { + accumulated.reserve(length); // Pre-allocate } + } - auto& ioContext = IoContext::current(); - return ioContext.awaitJs(js, - readAllReadImpl(js, ioContext.addObject(kj::mv(active)), kj::mv(accumulated), limit, - kj::mv(cancelationToken))); - }); - } - - KJ_UNREACHABLE; + auto& ioContext = IoContext::current(); + return ioContext.awaitJs(js, + readAllReadImpl(js, ioContext.addObject(kj::mv(active)), kj::mv(accumulated), limit, + kj::mv(cancelationToken))); + }); } template diff --git a/src/workerd/api/streams/readable-source.c++ b/src/workerd/api/streams/readable-source.c++ index f2e1ca172f8..07638c907c5 100644 --- a/src/workerd/api/streams/readable-source.c++ +++ b/src/workerd/api/streams/readable-source.c++ @@ -291,15 +291,13 @@ class ReadableSourceImpl: public ReadableSource { if (state.is()) { co_return 0; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - KJ_REQUIRE(canceler.isEmpty(), "jsg.Error: Stream is already being read"); - co_return co_await canceler.wrap(readInner(open, buffer, minBytes)); - // If the source is dropped while a read is in progress, the canceler will - // trigger and abort the read. In such cases, we don't want to wrap this - // await in a try catch because it isn't safe to continue using the stream - // as it may no longer exist. - } - KJ_UNREACHABLE; + auto& open = state.requireActiveUnsafe(); + KJ_REQUIRE(canceler.isEmpty(), "jsg.Error: Stream is already being read"); + co_return co_await canceler.wrap(readInner(open, buffer, minBytes)); + // If the source is dropped while a read is in progress, the canceler will + // trigger and abort the read. In such cases, we don't want to wrap this + // await in a try catch because it isn't safe to continue using the stream + // as it may no longer exist. } kj::Promise> pumpTo( @@ -323,32 +321,30 @@ class ReadableSourceImpl: public ReadableSource { co_return; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - // Ownership of the underlying inner stream is transferred to the pump operation, - // where it will be either fully consumed or errored out. In either case, this - // ReadableSource becomes closed and no longer usable once pumpTo() is called. - // Critically... it is important that just because the ReadableSource is closed here - // does NOT mean that the underlying stream has been fully consumed. - auto stream = kj::mv(open.stream); - setClosed(); - - if (output.getEncoding() != getEncoding()) { - // The target encoding is different from our current encoding. - // Let's ensure that our side is in identity encoding. The destination stream will - // take care of itself. - stream = ensureIdentityEncoding(kj::mv(stream)); - } else { - // Since the encodings match, we can tell the output stream that it doesn't need to - // do any of the encoding work since we'll be providing data in the expected encoding. - KJ_ASSERT(getEncoding() == output.disownEncodingResponsibility()); - } - - // Note that because we are transferring ownership of the stream to the pump operation, - // and the pump itself should not rely on the ReadableSource for any state, it is - // safe to drop the ReadableSource once the pump operation begins. - co_return co_await pumpImpl(kj::mv(stream), output, end); + auto& open = state.requireActiveUnsafe(); + // Ownership of the underlying inner stream is transferred to the pump operation, + // where it will be either fully consumed or errored out. In either case, this + // ReadableSource becomes closed and no longer usable once pumpTo() is called. + // Critically... it is important that just because the ReadableSource is closed here + // does NOT mean that the underlying stream has been fully consumed. + auto stream = kj::mv(open.stream); + setClosed(); + + if (output.getEncoding() != getEncoding()) { + // The target encoding is different from our current encoding. + // Let's ensure that our side is in identity encoding. The destination stream will + // take care of itself. + stream = ensureIdentityEncoding(kj::mv(stream)); + } else { + // Since the encodings match, we can tell the output stream that it doesn't need to + // do any of the encoding work since we'll be providing data in the expected encoding. + KJ_ASSERT(getEncoding() == output.disownEncodingResponsibility()); } - KJ_UNREACHABLE; + + // Note that because we are transferring ownership of the stream to the pump operation, + // and the pump itself should not rely on the ReadableSource for any state, it is + // safe to drop the ReadableSource once the pump operation begins. + co_return co_await pumpImpl(kj::mv(stream), output, end); } kj::Maybe tryGetLength(rpc::StreamEncoding encoding) override { @@ -400,20 +396,18 @@ class ReadableSourceImpl: public ReadableSource { }; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - KJ_IF_SOME(result, tryTee(limit)) { - setClosed(); - return kj::mv(result); - } - - auto teeResult = kj::newTee(kj::mv(open.stream), limit); + auto& open = state.requireActiveUnsafe(); + KJ_IF_SOME(result, tryTee(limit)) { setClosed(); - return Tee{ - .branch1 = newReadableSource(wrapTeeBranch(kj::mv(teeResult.branches[0]))), - .branch2 = newReadableSource(wrapTeeBranch(kj::mv(teeResult.branches[1]))), - }; + return kj::mv(result); } - KJ_UNREACHABLE; + + auto teeResult = kj::newTee(kj::mv(open.stream), limit); + setClosed(); + return Tee{ + .branch1 = newReadableSource(wrapTeeBranch(kj::mv(teeResult.branches[0]))), + .branch2 = newReadableSource(wrapTeeBranch(kj::mv(teeResult.branches[1]))), + }; } rpc::StreamEncoding getEncoding() override { diff --git a/src/workerd/api/streams/writable-sink-adapter.c++ b/src/workerd/api/streams/writable-sink-adapter.c++ index 70d8aabed1b..8a0d85df3f6 100644 --- a/src/workerd/api/streams/writable-sink-adapter.c++ +++ b/src/workerd/api/streams/writable-sink-adapter.c++ @@ -185,123 +185,121 @@ jsg::Promise WritableStreamSinkJsAdapter::write(jsg::Lock& js, const jsg:: return js.rejectedPromise(js.typeError("Write after close is not allowed")); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - // Dereference the IoOwn once to get the active state. - auto& active = *open.active; + auto& open = state.requireActiveUnsafe(); + // Dereference the IoOwn once to get the active state. + auto& active = *open.active; + + // If close is pending, we cannot accept any more writes. + if (active.closePending) { + auto exc = js.typeError("Write after close is not allowed"); + return js.rejectedPromise(exc); + } - // If close is pending, we cannot accept any more writes. - if (active.closePending) { - auto exc = js.typeError("Write after close is not allowed"); - return js.rejectedPromise(exc); + // Ok, we are in a writable state, there are no pending closes. + // Let's process our data and write it! + auto& ioContext = IoContext::current(); + + // We know that a WritableStreamSink only accepts bytes, so we need to + // verify that the value is a source of bytes. We accept three possible + // types: ArrayBuffer, ArrayBufferView, and String. If it is a string, + // we convert it to UTF-8 bytes. Anything else is an error. + if (value.isArrayBufferView() || value.isArrayBuffer() || value.isSharedArrayBuffer()) { + // We can just wrap the value with a jsg::BufferSource and write it. + jsg::BufferSource source(js, value); + if (active.options.detachOnWrite && source.canDetach(js)) { + // Detach from the original ArrayBuffer... + // ... and re-wrap it with a new BufferSource that we own. + source = jsg::BufferSource(js, source.detach(js)); } - // Ok, we are in a writable state, there are no pending closes. - // Let's process our data and write it! - auto& ioContext = IoContext::current(); - - // We know that a WritableStreamSink only accepts bytes, so we need to - // verify that the value is a source of bytes. We accept three possible - // types: ArrayBuffer, ArrayBufferView, and String. If it is a string, - // we convert it to UTF-8 bytes. Anything else is an error. - if (value.isArrayBufferView() || value.isArrayBuffer() || value.isSharedArrayBuffer()) { - // We can just wrap the value with a jsg::BufferSource and write it. - jsg::BufferSource source(js, value); - if (active.options.detachOnWrite && source.canDetach(js)) { - // Detach from the original ArrayBuffer... - // ... and re-wrap it with a new BufferSource that we own. - source = jsg::BufferSource(js, source.detach(js)); - } - - // Zero-length writes are a no-op. - if (source.size() == 0) { - return js.resolvedPromise(); - } + // Zero-length writes are a no-op. + if (source.size() == 0) { + return js.resolvedPromise(); + } - active.bytesInFlight += source.size(); - maybeSignalBackpressure(js); - // Enqueue the actual write operation into the write queue. We pass in - // two lambdas, one that does the actual write, and one that handles - // errors. If the write fails, we need to transition the adapter to the - // errored state. If the write succeeds, we need to decrement the - // bytesInFlight counter. - // - // The promise returned by enqueue is not the actual write promise but - // a branch forked off of it. We wrap that with a JS promise that waits - // for it to complete. Once it does, we check if we can release backpressure. - // This has to be done within an Isolate lock because we need to be able - // to resolve or reject the JS promises. If the write fails, we instead - // abort the backpressure state. - // - // This slight indirection does mean that the backpressure state change - // may be slightly delayed after the actual write completes but that's - // ok. - // - // Capturing active by reference here is safe because the lambda is - // held by the write queue, which is itself held by Active. If active - // is destroyed, the write queue is destroyed along with the lambda. - auto promise = - active.enqueue(kj::coCapture([&active, source = kj::mv(source)]() -> kj::Promise { - co_await active.sink->write(source.asArrayPtr()); - active.bytesInFlight -= source.size(); - })); - return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { - // Why do we need a weak ref here? Well, because this is a JavaScript - // promise continuation. It is possible that the kj::Own holding our - // adapter can be dropped while we are waiting for the continuation - // to run. If that happens, we don't want to delay cleanup of the - // adapter just because of backpressure state management that would - // not be needed anymore, so we use a weak ref to update the backpressure - // state only if we are still alive. - self->runIfAlive( - [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { - self.abort(js, error); - self.backpressureState.abort(js, error); - }); - js.throwException(kj::mv(exception)); + active.bytesInFlight += source.size(); + maybeSignalBackpressure(js); + // Enqueue the actual write operation into the write queue. We pass in + // two lambdas, one that does the actual write, and one that handles + // errors. If the write fails, we need to transition the adapter to the + // errored state. If the write succeeds, we need to decrement the + // bytesInFlight counter. + // + // The promise returned by enqueue is not the actual write promise but + // a branch forked off of it. We wrap that with a JS promise that waits + // for it to complete. Once it does, we check if we can release backpressure. + // This has to be done within an Isolate lock because we need to be able + // to resolve or reject the JS promises. If the write fails, we instead + // abort the backpressure state. + // + // This slight indirection does mean that the backpressure state change + // may be slightly delayed after the actual write completes but that's + // ok. + // + // Capturing active by reference here is safe because the lambda is + // held by the write queue, which is itself held by Active. If active + // is destroyed, the write queue is destroyed along with the lambda. + auto promise = + active.enqueue(kj::coCapture([&active, source = kj::mv(source)]() -> kj::Promise { + co_await active.sink->write(source.asArrayPtr()); + active.bytesInFlight -= source.size(); + })); + return ioContext + .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { + // Why do we need a weak ref here? Well, because this is a JavaScript + // promise continuation. It is possible that the kj::Own holding our + // adapter can be dropped while we are waiting for the continuation + // to run. If that happens, we don't want to delay cleanup of the + // adapter just because of backpressure state management that would + // not be needed anymore, so we use a weak ref to update the backpressure + // state only if we are still alive. + self->runIfAlive( + [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); + }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { + self.abort(js, error); + self.backpressureState.abort(js, error); }); - } else if (value.isString()) { - // Also super easy! Let's just convert the string to UTF-8 - auto str = value.toString(js); + js.throwException(kj::mv(exception)); + }); + } else if (value.isString()) { + // Also super easy! Let's just convert the string to UTF-8 + auto str = value.toString(js); - // Zero-length writes are a no-op. - if (str.size() == 0) { - return js.resolvedPromise(); - } + // Zero-length writes are a no-op. + if (str.size() == 0) { + return js.resolvedPromise(); + } - active.bytesInFlight += str.size(); - // Make sure to account for the memory used by the string while the - // write is in-flight/pending - auto accounting = js.getExternalMemoryAdjustment(str.size()); - maybeSignalBackpressure(js); - // Just like above, enqueue the write operation into the write queue, - // ensuring that we handle both the success and failure cases. - auto promise = active.enqueue(kj::coCapture( - [&active, str = kj::mv(str), accounting = kj::mv(accounting)]() -> kj::Promise { - co_await active.sink->write(str.asBytes()); - active.bytesInFlight -= str.size(); - })); - return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { - self->runIfAlive( - [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { - self.abort(js, error); - self.backpressureState.abort(js, error); - }); - js.throwException(kj::mv(exception)); + active.bytesInFlight += str.size(); + // Make sure to account for the memory used by the string while the + // write is in-flight/pending + auto accounting = js.getExternalMemoryAdjustment(str.size()); + maybeSignalBackpressure(js); + // Just like above, enqueue the write operation into the write queue, + // ensuring that we handle both the success and failure cases. + auto promise = active.enqueue(kj::coCapture( + [&active, str = kj::mv(str), accounting = kj::mv(accounting)]() -> kj::Promise { + co_await active.sink->write(str.asBytes()); + active.bytesInFlight -= str.size(); + })); + return ioContext + .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { + self->runIfAlive( + [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); + }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { + self.abort(js, error); + self.backpressureState.abort(js, error); }); - } else { - auto err = js.typeError("This WritableStream only supports writing byte types."_kj); - return js.rejectedPromise(err); - } + js.throwException(kj::mv(exception)); + }); } - KJ_UNREACHABLE; + + auto err = js.typeError("This WritableStream only supports writing byte types."_kj); + return js.rejectedPromise(err); } jsg::Promise WritableStreamSinkJsAdapter::flush(jsg::Lock& js) { @@ -317,28 +315,26 @@ jsg::Promise WritableStreamSinkJsAdapter::flush(jsg::Lock& js) { return js.rejectedPromise(js.typeError("Flush after close is not allowed")); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - // Dereference the IoOwn once to get the active state. - auto& active = *open.active; - - // If close is pending, we cannot accept any more writes. - if (active.closePending) { - auto exc = js.typeError("Flush after close is not allowed"); - return js.rejectedPromise(exc); - } + auto& open = state.requireActiveUnsafe(); + // Dereference the IoOwn once to get the active state. + auto& active = *open.active; - // Ok, we are in a writable state, there are no pending closes. - // Let's enqueue our flush signal. - auto& ioContext = IoContext::current(); - // Flushing is really just a non-op write. We enqueue a no-op task - // into the write queue and wait for it to complete. - auto promise = active.enqueue([]() -> kj::Promise { - // Non-op. - return kj::READY_NOW; - }); - return ioContext.awaitIo(js, kj::mv(promise)); + // If close is pending, we cannot accept any more writes. + if (active.closePending) { + auto exc = js.typeError("Flush after close is not allowed"); + return js.rejectedPromise(exc); } - KJ_UNREACHABLE; + + // Ok, we are in a writable state, there are no pending closes. + // Let's enqueue our flush signal. + auto& ioContext = IoContext::current(); + // Flushing is really just a non-op write. We enqueue a no-op task + // into the write queue and wait for it to complete. + auto promise = active.enqueue([]() -> kj::Promise { + // Non-op. + return kj::READY_NOW; + }); + return ioContext.awaitIo(js, kj::mv(promise)); } // Transitions the adapter into the closing state. Once the write queue @@ -357,38 +353,36 @@ jsg::Promise WritableStreamSinkJsAdapter::end(jsg::Lock& js) { return js.resolvedPromise(); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& ioContext = IoContext::current(); - auto& active = *open.active; - - if (active.closePending) { - return js.rejectedPromise(js.typeError("Close already pending, cannot close again.")); - } + auto& open = state.requireActiveUnsafe(); + auto& ioContext = IoContext::current(); + auto& active = *open.active; - active.closePending = true; - auto promise = active.enqueue( - kj::coCapture([&active]() -> kj::Promise { co_await active.sink->end(); })); + if (active.closePending) { + return js.rejectedPromise(js.typeError("Close already pending, cannot close again.")); + } - return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { - // While nothing at this point should be actually waiting on the ready promise, - // we should still resolve it just in case. - self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { - self.state.transitionTo(); - self.maybeReleaseBackpressure(js); - }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) { - // Likewise, while nothing should be waiting on the ready promise, we - // should still reject it just in case. - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { - self.abort(js, error); - self.backpressureState.abort(js, error); - }); - js.throwException(kj::mv(exception)); + active.closePending = true; + auto promise = active.enqueue( + kj::coCapture([&active]() -> kj::Promise { co_await active.sink->end(); })); + + return ioContext + .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { + // While nothing at this point should be actually waiting on the ready promise, + // we should still resolve it just in case. + self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { + self.state.transitionTo(); + self.maybeReleaseBackpressure(js); }); - } - KJ_UNREACHABLE; + }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value&& exception) { + // Likewise, while nothing should be waiting on the ready promise, we + // should still reject it just in case. + auto error = jsg::JsValue(exception.getHandle(js)); + self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { + self.abort(js, error); + self.backpressureState.abort(js, error); + }); + js.throwException(kj::mv(exception)); + }); } // Transitions the adapter to the errored state, even if we are already closed. @@ -450,7 +444,7 @@ WritableStreamSinkJsAdapter::BackpressureState::BackpressureState( void WritableStreamSinkJsAdapter::maybeSignalBackpressure(jsg::Lock& js) { // We should only be signaling backpressure if we are in an active state. - KJ_ASSERT_NONNULL(state.tryGetActiveUnsafe()); + state.requireActiveUnsafe(); // Indicate that backpressure is being applied. If we are already in a // backpressure state (isWaiting() is true), this is a no-op. if (!backpressureState.isWaiting()) { @@ -585,67 +579,64 @@ kj::Promise WritableStreamSinkKjAdapter::write( KJ_FAIL_REQUIRE("Cannot write after close."); } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - KJ_REQUIRE(!active.writePending, "Cannot have multiple concurrent writes."); - KJ_IF_SOME(exception, active.pendingAbort) { - auto exc = kj::cp(exception); - state.forceTransitionTo(kj::cp(exc)); - return kj::mv(exc); + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + KJ_REQUIRE(!active.writePending, "Cannot have multiple concurrent writes."); + KJ_IF_SOME(exception, active.pendingAbort) { + auto exc = kj::cp(exception); + state.forceTransitionTo(kj::cp(exc)); + return kj::mv(exc); + } + if (active.closePending) { + state.transitionTo(); + KJ_FAIL_REQUIRE("Cannot write after close."); + } + active.writePending = true; + + return active.canceler + .wrap(active.ioContext.run([self = selfRef.addRef(), writer = active.writer.addRef(), + pieces = pieces](jsg::Lock& js) mutable -> kj::Promise { + size_t totalAmount = 0; + for (auto piece: pieces) { + totalAmount += piece.size(); } - if (active.closePending) { - state.transitionTo(); - KJ_FAIL_REQUIRE("Cannot write after close."); + if (totalAmount == 0) { + return kj::READY_NOW; } - active.writePending = true; - return active.canceler - .wrap( - active.ioContext.run([self = selfRef.addRef(), writer = active.writer.addRef(), - pieces = pieces](jsg::Lock& js) mutable -> kj::Promise { - size_t totalAmount = 0; - for (auto piece: pieces) { - totalAmount += piece.size(); - } - if (totalAmount == 0) { - return kj::READY_NOW; - } + // We collapse our pieces into a single ArrayBuffer for efficiency. The + // WritableStream API has no concept of a vector write, so each write + // would incur the overhead of a separate promise and microtask checkpoint. + // By collapsing into a single write we reduce that overhead. + auto backing = jsg::BackingStore::alloc(js, totalAmount); + auto ptr = backing.asArrayPtr(); + for (auto piece: pieces) { + ptr.first(piece.size()).copyFrom(piece); + ptr = ptr.slice(piece.size()); + } + jsg::BufferSource source(js, kj::mv(backing)); - // We collapse our pieces into a single ArrayBuffer for efficiency. The - // WritableStream API has no concept of a vector write, so each write - // would incur the overhead of a separate promise and microtask checkpoint. - // By collapsing into a single write we reduce that overhead. - auto backing = jsg::BackingStore::alloc(js, totalAmount); - auto ptr = backing.asArrayPtr(); - for (auto piece: pieces) { - ptr.first(piece.size()).copyFrom(piece); - ptr = ptr.slice(piece.size()); + auto ready = KJ_ASSERT_NONNULL(writer->isReady(js)); + auto promise = + ready.then(js, [writer = writer.addRef(), source = kj::mv(source)](jsg::Lock& js) mutable { + return writer->write(js, source.getHandle(js)); + }); + return IoContext::current().awaitJs(js, kj::mv(promise)); + })).then([self = selfRef.addRef()]() { + self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { + KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { + open.active->writePending = false; } - jsg::BufferSource source(js, kj::mv(backing)); - - auto ready = KJ_ASSERT_NONNULL(writer->isReady(js)); - auto promise = ready.then( - js, [writer = writer.addRef(), source = kj::mv(source)](jsg::Lock& js) mutable { - return writer->write(js, source.getHandle(js)); - }); - return IoContext::current().awaitJs(js, kj::mv(promise)); - })).then([self = selfRef.addRef()]() { - self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { - KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { - open.active->writePending = false; - } - }); - }, [self = selfRef.addRef()](kj::Exception exception) { - self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { - KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { - open.active->writePending = false; - open.active->pendingAbort = kj::cp(exception); - } - }); - kj::throwFatalException(kj::mv(exception)); }); - } - KJ_UNREACHABLE; + }, [self = selfRef.addRef()](kj::Exception exception) { + self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { + KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { + open.active->writePending = false; + open.active->pendingAbort = kj::cp(exception); + } + }); + kj::throwFatalException(kj::mv(exception)); + }); } kj::Promise WritableStreamSinkKjAdapter::end() { @@ -657,34 +648,32 @@ kj::Promise WritableStreamSinkKjAdapter::end() { return kj::READY_NOW; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - auto& active = *open.active; - KJ_REQUIRE(!active.writePending, "Cannot have multiple concurrent writes."); - KJ_IF_SOME(exception, active.pendingAbort) { - auto exc = kj::mv(exception); - state.forceTransitionTo(kj::cp(exc)); - return kj::mv(exc); - } - if (active.closePending) { - state.transitionTo(); - return kj::READY_NOW; - } - active.closePending = true; - return active.canceler - .wrap(active.ioContext.run( - [self = selfRef.addRef(), writer = active.writer.addRef()](jsg::Lock& js) mutable { - auto promise = writer->close(js); - return IoContext::current().awaitJs(js, kj::mv(promise)); - })).catch_([self = selfRef.addRef()](kj::Exception exception) { - self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { - KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { - open.active->pendingAbort = kj::cp(exception); - } - }); - kj::throwFatalException(kj::mv(exception)); - }); + auto& open = state.requireActiveUnsafe(); + auto& active = *open.active; + KJ_REQUIRE(!active.writePending, "Cannot have multiple concurrent writes."); + KJ_IF_SOME(exception, active.pendingAbort) { + auto exc = kj::mv(exception); + state.forceTransitionTo(kj::cp(exc)); + return kj::mv(exc); + } + if (active.closePending) { + state.transitionTo(); + return kj::READY_NOW; } - KJ_UNREACHABLE; + active.closePending = true; + return active.canceler + .wrap(active.ioContext.run( + [self = selfRef.addRef(), writer = active.writer.addRef()](jsg::Lock& js) mutable { + auto promise = writer->close(js); + return IoContext::current().awaitJs(js, kj::mv(promise)); + })).catch_([self = selfRef.addRef()](kj::Exception exception) { + self->runIfAlive([&](WritableStreamSinkKjAdapter& self) { + KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { + open.active->pendingAbort = kj::cp(exception); + } + }); + kj::throwFatalException(kj::mv(exception)); + }); } void WritableStreamSinkKjAdapter::abort(kj::Exception reason) { diff --git a/src/workerd/api/streams/writable-sink.c++ b/src/workerd/api/streams/writable-sink.c++ index 569e27df538..797a3f4cca6 100644 --- a/src/workerd/api/streams/writable-sink.c++ +++ b/src/workerd/api/streams/writable-sink.c++ @@ -87,20 +87,18 @@ class WritableSinkImpl: public WritableSink { if (state.is()) { co_return; } - KJ_IF_SOME(open, state.tryGetActiveUnsafe()) { - KJ_REQUIRE(canceler.isEmpty(), "jsg.Error: Stream is already being written to"); - // The AsyncOutputStream interface does not yet have an end() method. - // Instead, we just drop it, signaling EOF. Eventually, it might get - // an end method, at which point we should use that instead. - try { - co_await canceler.wrap(endImpl(*open.stream)); - setClosed(); - co_return; - } catch (...) { - handleOperationException(); - } + auto& open = state.requireActiveUnsafe(); + KJ_REQUIRE(canceler.isEmpty(), "jsg.Error: Stream is already being written to"); + // The AsyncOutputStream interface does not yet have an end() method. + // Instead, we just drop it, signaling EOF. Eventually, it might get + // an end method, at which point we should use that instead. + try { + co_await canceler.wrap(endImpl(*open.stream)); + setClosed(); + co_return; + } catch (...) { + handleOperationException(); } - KJ_UNREACHABLE; } void abort(kj::Exception reason) override final {