Skip to content

Commit 6f56bb8

Browse files
committed
Replace BodyInputStream with MemoryInputStream
More preparation for using the new stream adapters. Replace the redundant BodyInputStream with MemoryInputStream.
1 parent 153c3ef commit 6f56bb8

File tree

1 file changed

+11
-53
lines changed

1 file changed

+11
-53
lines changed

src/workerd/api/http.c++

Lines changed: 11 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -591,53 +591,6 @@ jsg::Ref<Headers> Headers::deserialize(
591591
}
592592

593593
// =======================================================================================
594-
595-
namespace {
596-
597-
class BodyBufferInputStream final: public ReadableStreamSource {
598-
public:
599-
BodyBufferInputStream(Body::Buffer buffer)
600-
: unread(buffer.view),
601-
ownBytes(kj::mv(buffer.ownBytes)) {}
602-
603-
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
604-
if (unread != nullptr) {
605-
size_t amount = kj::min(maxBytes, unread.size());
606-
memcpy(buffer, unread.begin(), amount);
607-
unread = unread.slice(amount, unread.size());
608-
return amount;
609-
}
610-
611-
return static_cast<size_t>(0);
612-
}
613-
614-
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override {
615-
if (encoding == StreamEncoding::IDENTITY) {
616-
return unread.size();
617-
} else {
618-
// Who knows what the compressed size will be?
619-
return kj::none;
620-
}
621-
}
622-
623-
kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
624-
if (unread != nullptr) {
625-
auto data = unread;
626-
unread = nullptr;
627-
co_await output.write(data);
628-
if (end) co_await output.end();
629-
}
630-
631-
co_return;
632-
}
633-
634-
private:
635-
kj::ArrayPtr<const byte> unread;
636-
kj::OneOf<kj::Own<Body::RefcountedBytes>, jsg::Ref<Blob>> ownBytes;
637-
};
638-
639-
} // namespace
640-
641594
// Make an array of characters containing random hexadecimal digits.
642595
//
643596
// Note: Rather than use random hex digits, we could generate the hex digits by hashing the
@@ -725,10 +678,14 @@ Body::ExtractedBody Body::extractBody(jsg::Lock& js, Initializer init) {
725678
}
726679
}
727680

728-
auto bodyStream = kj::heap<BodyBufferInputStream>(buffer.clone(js));
729-
730-
return {js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream)), kj::mv(buffer),
731-
kj::mv(contentType)};
681+
auto clonedBuffer = buffer.clone(js);
682+
auto memStream = newMemoryInputStream(clonedBuffer.view, kj::heap(kj::mv(clonedBuffer.ownBytes)));
683+
auto rs = newSystemStream(kj::mv(memStream), StreamEncoding::IDENTITY, IoContext::current());
684+
return {
685+
js.alloc<ReadableStream>(IoContext::current(), kj::mv(rs)),
686+
kj::mv(buffer),
687+
kj::mv(contentType),
688+
};
732689
}
733690

734691
Body::Body(jsg::Lock& js, kj::Maybe<ExtractedBody> init, Headers& headers)
@@ -775,8 +732,9 @@ void Body::rewindBody(jsg::Lock& js) {
775732

776733
KJ_IF_SOME(i, impl) {
777734
auto bufferCopy = KJ_ASSERT_NONNULL(i.buffer).clone(js);
778-
auto bodyStream = kj::heap<BodyBufferInputStream>(kj::mv(bufferCopy));
779-
i.stream = js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream));
735+
auto memStream = newMemoryInputStream(bufferCopy.view, kj::heap(kj::mv(bufferCopy.ownBytes)));
736+
auto rs = newSystemStream(kj::mv(memStream), StreamEncoding::IDENTITY, IoContext::current());
737+
i.stream = js.alloc<ReadableStream>(IoContext::current(), kj::mv(rs));
780738
}
781739
}
782740

0 commit comments

Comments
 (0)