Skip to content

Commit 4c5c2de

Browse files
committed
Address feedback from review
1 parent 986a3d5 commit 4c5c2de

File tree

9 files changed

+359
-306
lines changed

9 files changed

+359
-306
lines changed

src/workerd/api/streams/readable-source-adapter-test.c++

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ KJ_TEST("After read BackingStore maintains identity") {
808808

809809
KJ_TEST("Read all text") {
810810
TestFixture fixture;
811-
FiniteReadSource source(2);
811+
FiniteReadSource source(4);
812812

813813
fixture.runInIoContext([&](const TestFixture::Environment& env) {
814814
kj::Own<kj::AsyncInputStream> fake(&source, kj::NullDisposer::instance);
@@ -820,15 +820,16 @@ KJ_TEST("Read all text") {
820820
adapter->readAllText(env.js).then(
821821
env.js, [&adapter = *adapter](jsg::Lock& js, jsg::JsRef<jsg::JsString> result) {
822822
auto str = result.getHandle(js).toString(js);
823-
KJ_ASSERT(str.size() == 8192);
823+
// With exponential growth strategy: 1024 + 2048 + 4096 + 8192 = 15360
824+
KJ_ASSERT(str.size() == 15360);
824825
KJ_ASSERT(adapter.isClosed(), "Adapter should be closed after readAllText()");
825826
})).attach(kj::mv(adapter));
826827
});
827828
}
828829

829830
KJ_TEST("Read all bytes") {
830831
TestFixture fixture;
831-
FiniteReadSource source(2);
832+
FiniteReadSource source(4);
832833

833834
fixture.runInIoContext([&](const TestFixture::Environment& env) {
834835
kj::Own<kj::AsyncInputStream> fake(&source, kj::NullDisposer::instance);
@@ -839,7 +840,8 @@ KJ_TEST("Read all bytes") {
839840
.awaitJs(env.js,
840841
adapter->readAllBytes(env.js).then(
841842
env.js, [&adapter = *adapter](jsg::Lock& js, jsg::BufferSource result) {
842-
KJ_ASSERT(result.size() == 8192);
843+
// With exponential growth strategy: 1024 + 2048 + 4096 + 8192 = 15360
844+
KJ_ASSERT(result.size() == 15360);
843845
KJ_ASSERT(adapter.isClosed(), "Adapter should be closed after readAllText()");
844846
})).attach(kj::mv(adapter));
845847
});
@@ -1094,7 +1096,7 @@ KJ_TEST("KjAdapter constructor with valid normal ReadableStream") {
10941096

10951097
// Teeing is unsupported so always throws
10961098
try {
1097-
adapter->tee();
1099+
adapter->tee(1);
10981100
} catch (...) {
10991101
auto ex = kj::getCaughtExceptionAsKj();
11001102
KJ_ASSERT(ex.getDescription().contains("not supported"));
@@ -1470,7 +1472,7 @@ KJ_TEST("KjAdapter pumpTo") {
14701472
auto stream = createFiniteBytesReadableStream(env.js, 1024);
14711473
auto adapter = kj::heap<ReadableStreamSourceKjAdapter>(env.js, env.context, stream.addRef());
14721474

1473-
return adapter->pumpTo(*writableSink, true).attach(kj::mv(adapter));
1475+
return adapter->pumpTo(*writableSink, EndAfterPump::YES).attach(kj::mv(adapter));
14741476
});
14751477

14761478
kj::FixedArray<kj::byte, 10 * 1024> expected;
@@ -1502,7 +1504,7 @@ KJ_TEST("KjAdapter pumpTo (no end)") {
15021504
auto stream = createFiniteBytesReadableStream(env.js, 1024);
15031505
auto adapter = kj::heap<ReadableStreamSourceKjAdapter>(env.js, env.context, stream.addRef());
15041506

1505-
return adapter->pumpTo(*writableSink, false).attach(kj::mv(adapter));
1507+
return adapter->pumpTo(*writableSink, EndAfterPump::NO).attach(kj::mv(adapter));
15061508
});
15071509

15081510
kj::FixedArray<kj::byte, 10 * 1024> expected;
@@ -1534,7 +1536,7 @@ KJ_TEST("KjAdapter pumpTo (errored)") {
15341536
auto stream = createErroredStream(env.js);
15351537
auto adapter = kj::heap<ReadableStreamSourceKjAdapter>(env.js, env.context, stream.addRef());
15361538

1537-
return env.context.waitForDeferredProxy(adapter->pumpTo(*writableSink, false))
1539+
return env.context.waitForDeferredProxy(adapter->pumpTo(*writableSink, EndAfterPump::NO))
15381540
.then([]() -> kj::Promise<void> {
15391541
KJ_FAIL_ASSERT("Should not have completed pumpTo on errored stream");
15401542
}, [](kj::Exception exception) {
@@ -1555,7 +1557,7 @@ KJ_TEST("KjAdapter pumpTo (error sink)") {
15551557
auto stream = createFiniteBytesReadableStream(env.js, 1000);
15561558
auto adapter = kj::heap<ReadableStreamSourceKjAdapter>(env.js, env.context, stream.addRef());
15571559

1558-
return env.context.waitForDeferredProxy(adapter->pumpTo(*writableSink, false))
1560+
return env.context.waitForDeferredProxy(adapter->pumpTo(*writableSink, EndAfterPump::NO))
15591561
.then([]() -> kj::Promise<void> {
15601562
KJ_FAIL_ASSERT("Should not have completed pumpTo on errored stream");
15611563
}, [](kj::Exception exception) {

src/workerd/api/streams/readable-source-adapter.c++

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -549,11 +549,11 @@ struct ReadableStreamSourceKjAdapter::ReadContext {
549549
kj::Rc<WeakRef<ReadableStreamSourceKjAdapter>> adapterRef;
550550

551551
void reset() {
552+
// Resetting is only allowed if we have the backing buffer.
553+
buffer = KJ_ASSERT_NONNULL(backingBuffer);
552554
totalRead = 0;
553555
minBytes = 0;
554556
maybeLeftOver = kj::none;
555-
// Resetting is only allowed if we have the backing buffer.
556-
buffer = KJ_ASSERT_NONNULL(backingBuffer);
557557
}
558558
};
559559

@@ -967,7 +967,7 @@ kj::Promise<size_t> ReadableStreamSourceKjAdapter::read(
967967
KJ_UNREACHABLE;
968968
}
969969

970-
kj::Maybe<uint64_t> ReadableStreamSourceKjAdapter::tryGetLength(StreamEncoding encoding) {
970+
kj::Maybe<size_t> ReadableStreamSourceKjAdapter::tryGetLength(StreamEncoding encoding) {
971971
KJ_IF_SOME(active, state.tryGet<kj::Own<Active>>()) {
972972
if (active->state.is<Active::Done>()) {
973973
// If the previous read indicated that it was the last, then
@@ -980,7 +980,8 @@ kj::Maybe<uint64_t> ReadableStreamSourceKjAdapter::tryGetLength(StreamEncoding e
980980
state = kj::mv(exception);
981981
return kj::none;
982982
}
983-
return active->stream->tryGetLength(encoding);
983+
return active->stream->tryGetLength(encoding).map(
984+
[](uint64_t len) { return static_cast<size_t>(len); });
984985
}
985986

986987
// The stream is either closed or errored.
@@ -994,7 +995,8 @@ void ReadableStreamSourceKjAdapter::cancel(kj::Exception reason) {
994995
state = kj::mv(reason);
995996
}
996997

997-
kj::Promise<void> ReadableStreamSourceKjAdapter::pumpToImpl(WritableStreamSink& output, bool end) {
998+
kj::Promise<void> ReadableStreamSourceKjAdapter::pumpToImpl(
999+
WritableStreamSink& output, EndAfterPump end) {
9981000
static constexpr size_t kMinRead = 8192;
9991001
static constexpr size_t kMaxRead = 16384;
10001002
kj::FixedArray<kj::byte, kMaxRead> buffer;
@@ -1074,7 +1076,7 @@ kj::Promise<void> ReadableStreamSourceKjAdapter::pumpToImpl(WritableStreamSink&
10741076
}
10751077

10761078
kj::Promise<DeferredProxy<void>> ReadableStreamSourceKjAdapter::pumpTo(
1077-
WritableStreamSink& output, bool end) {
1079+
WritableStreamSink& output, EndAfterPump end) {
10781080
// The pumpTo operation continually reads from the stream and writes
10791081
// to the output until the stream is closed or an error occurs. While
10801082
// pumping, the adapter is considered active but read() calls will
@@ -1135,7 +1137,7 @@ kj::Promise<DeferredProxy<void>> ReadableStreamSourceKjAdapter::pumpTo(
11351137
KJ_UNREACHABLE;
11361138
}
11371139

1138-
ReadableStreamSource::Tee ReadableStreamSourceKjAdapter::tee(kj::Maybe<size_t> maybeLimit) {
1140+
ReadableStreamSource::Tee ReadableStreamSourceKjAdapter::tee(size_t) {
11391141
KJ_UNIMPLEMENTED("Teeing a ReadableStreamSourceKjAdapter is not supported.");
11401142
// Explanation: Teeing a ReadableStream must be done under the isolate lock,
11411143
// as does creating a new ReadableStreamSourceKjAdapter. However, when tee()

src/workerd/api/streams/readable-source-adapter.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,13 @@ class ReadableStreamSourceKjAdapter final: public ReadableStreamSource {
333333
// Per the contract of pumpTo, it is the caller's responsibility to ensure
334334
// that both the WritableStreamSink and this adapter remain alive until
335335
// the returned promise resolves!
336-
kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override;
336+
kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, EndAfterPump end) override;
337337

338338
// If the stream is still active, tries to get the total length,
339339
// if known. If the length is not known, the encoding does not
340340
// match the encoding of the underlying stream, or the stream is closed
341341
// or errored, returns kj::none.
342-
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override;
342+
kj::Maybe<size_t> tryGetLength(StreamEncoding encoding) override;
343343

344344
// Cancels the underlying source if it is still active.
345345
void cancel(kj::Exception reason) override;
@@ -349,7 +349,7 @@ class ReadableStreamSourceKjAdapter final: public ReadableStreamSource {
349349
return StreamEncoding::IDENTITY;
350350
};
351351

352-
Tee tee(kj::Maybe<size_t> maybeLimit = kj::none) override;
352+
Tee tee(size_t limit) override;
353353

354354
struct ReadContext;
355355
KJ_DECLARE_NON_POLYMORPHIC(ReadContext);
@@ -363,7 +363,7 @@ class ReadableStreamSourceKjAdapter final: public ReadableStreamSource {
363363
kj::Rc<WeakRef<ReadableStreamSourceKjAdapter>> selfRef;
364364

365365
kj::Promise<size_t> readImpl(Active& active, kj::ArrayPtr<kj::byte> buffer, size_t minBytes);
366-
kj::Promise<void> pumpToImpl(WritableStreamSink& output, bool end);
366+
kj::Promise<void> pumpToImpl(WritableStreamSink& output, EndAfterPump end);
367367
static jsg::Promise<kj::Own<ReadContext>> readInternal(
368368
jsg::Lock& js, kj::Own<ReadContext> context, MinReadPolicy minReadPolicy);
369369

src/workerd/api/streams/readable-source-test.c++

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ namespace workerd::api::streams {
1313
namespace {
1414

1515
// Mock WritableStreamSink for testing pumpTo functionality
16-
class MockWritableStreamSink: public WritableStreamSink {
16+
class MockWritableStreamSink final: public WritableStreamSink {
1717
public:
18+
MockWritableStreamSink() = default;
19+
~MockWritableStreamSink() = default;
20+
1821
kj::Promise<void> write(kj::ArrayPtr<const kj::byte> buffer) override {
1922
writeCallCount++;
2023
totalBytesWritten += buffer.size();
@@ -245,7 +248,7 @@ KJ_TEST("ReadableStreamSource pumpTo with end") {
245248
MockWritableStreamSink sink;
246249

247250
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
248-
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
251+
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
249252
KJ_ASSERT(sink.totalBytesWritten == 10);
250253
KJ_ASSERT(sink.isEnded);
251254
KJ_ASSERT(sink.writtenData == testData);
@@ -263,7 +266,7 @@ KJ_TEST("ReadableStreamSource pumpTo without end") {
263266
MockWritableStreamSink sink;
264267

265268
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
266-
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, false));
269+
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::NO));
267270
KJ_ASSERT(sink.totalBytesWritten == 10);
268271
KJ_ASSERT(!sink.isEnded);
269272
KJ_ASSERT(sink.writtenData == testData);
@@ -282,7 +285,7 @@ KJ_TEST("ReadableStreamSource large pumpTo with end") {
282285
MockWritableStreamSink sink;
283286

284287
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
285-
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
288+
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
286289
KJ_ASSERT(sink.totalBytesWritten == 52 * 1024);
287290
KJ_ASSERT(sink.isEnded);
288291
KJ_ASSERT(sink.writtenData == testData);
@@ -301,7 +304,8 @@ KJ_TEST("ReadableStreamSource large pumpTo canceled") {
301304
MockWritableStreamSink sink;
302305

303306
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
304-
auto promise = environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
307+
auto promise =
308+
environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
305309
source->cancel(KJ_EXCEPTION(FAILED, "test abort"));
306310
try {
307311
co_await promise;
@@ -325,7 +329,8 @@ KJ_TEST("ReadableStreamSource large pumpTo canceled before") {
325329

326330
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
327331
source->cancel(KJ_EXCEPTION(FAILED, "test abort"));
328-
auto promise = environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
332+
auto promise =
333+
environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
329334
try {
330335
co_await promise;
331336
} catch (...) {
@@ -349,7 +354,7 @@ KJ_TEST("ReadableStreamSource large pumpTo closed") {
349354
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
350355
auto& context = environment.context;
351356
co_await source->readAllBytes(kj::maxValue);
352-
co_await context.waitForDeferredProxy(source->pumpTo(sink, true));
357+
co_await context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
353358
KJ_ASSERT(sink.totalBytesWritten == 0);
354359
});
355360
}
@@ -366,7 +371,8 @@ KJ_TEST("ReadableStreamSource large pumpTo, concurrent read fails") {
366371
MockWritableStreamSink sink;
367372

368373
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
369-
auto promise = environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
374+
auto promise =
375+
environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
370376

371377
// Concurrent read should fail.
372378
try {
@@ -536,7 +542,7 @@ KJ_TEST("ReadableStreamSource tee (small, no limit)") {
536542
auto fakeOwn = kj::Own<MemoryAsyncInputStream>(&input, kj::NullDisposer::instance);
537543
auto source = newReadableStreamSource(kj::mv(fakeOwn));
538544

539-
auto tee = source->tee(kj::none);
545+
auto tee = source->tee(200);
540546
auto branch1 = kj::mv(tee.branch1);
541547
auto branch2 = kj::mv(tee.branch2);
542548

@@ -563,7 +569,7 @@ KJ_TEST("ReadableStreamSource tee (small, no limit, independent)") {
563569
auto fakeOwn = kj::Own<MemoryAsyncInputStream>(&input, kj::NullDisposer::instance);
564570
auto source = newReadableStreamSource(kj::mv(fakeOwn));
565571

566-
auto tee = source->tee(kj::none);
572+
auto tee = source->tee(200);
567573
auto branch1 = kj::mv(tee.branch1);
568574
auto branch2 = kj::mv(tee.branch2);
569575
branch2->cancel(KJ_EXCEPTION(FAILED, "test abort"));
@@ -596,7 +602,7 @@ KJ_TEST("ReadableStreamSource tee (large, no limit)") {
596602
auto fakeOwn = kj::Own<MemoryAsyncInputStream>(&input, kj::NullDisposer::instance);
597603
auto source = newReadableStreamSource(kj::mv(fakeOwn));
598604

599-
auto tee = source->tee(kj::none);
605+
auto tee = source->tee(0xffffffff);
600606
auto branch1 = kj::mv(tee.branch1);
601607
auto branch2 = kj::mv(tee.branch2);
602608

@@ -655,7 +661,7 @@ KJ_TEST("ReadableStreamSource after read") {
655661
KJ_ASSERT(bytesRead == 512);
656662
KJ_ASSERT(buffer.asPtr().first(bytesRead) == testData.asPtr().first(bytesRead));
657663

658-
auto tee = source->tee(kj::none);
664+
auto tee = source->tee(0xffffffff);
659665
auto branch1 = kj::mv(tee.branch1);
660666
auto branch2 = kj::mv(tee.branch2);
661667

@@ -839,7 +845,7 @@ KJ_TEST("newReadableStreamSourceFromDelegate") {
839845
return toRead;
840846
};
841847

842-
auto source = newReadableStreamSourceFromDelegate(kj::mv(producer), 5);
848+
auto source = newReadableStreamSourceFromProducer(kj::mv(producer), 5);
843849

844850
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
845851
kj::FixedArray<kj::byte, 5> buffer;
@@ -874,7 +880,7 @@ KJ_TEST("newReadableStreamSourceFromDelegate (not enough bytes)") {
874880
return toRead;
875881
};
876882

877-
auto source = newReadableStreamSourceFromDelegate(kj::mv(producer), 10);
883+
auto source = newReadableStreamSourceFromProducer(kj::mv(producer), 10);
878884

879885
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
880886
kj::FixedArray<kj::byte, 5> buffer;
@@ -919,7 +925,7 @@ KJ_TEST("Gzip encoded stream (pumpTo)") {
919925
MockWritableStreamSink sink;
920926

921927
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
922-
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, true));
928+
co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES));
923929
});
924930

925931
KJ_ASSERT(sink.writtenData == "some data to gzip"_kjb);
@@ -937,7 +943,7 @@ KJ_TEST("Gzip encoded stream (pumpTo same encoding)") {
937943
auto sink = newEncodedWritableStreamSink(rpc::StreamEncoding::GZIP, kj::mv(fakeOwn));
938944

939945
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
940-
co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, true));
946+
co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, EndAfterPump::YES));
941947
});
942948

943949
// The data should pass through unchanged.
@@ -956,7 +962,7 @@ KJ_TEST("Gzip encoded stream (pumpTo different encoding)") {
956962
auto sink = newEncodedWritableStreamSink(rpc::StreamEncoding::BROTLI, kj::mv(fakeOwn));
957963

958964
fixture.runInIoContext([&](const auto& environment) -> kj::Promise<void> {
959-
co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, true));
965+
co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, EndAfterPump::YES));
960966
});
961967

962968
// The data shuld be brotli compressed.

0 commit comments

Comments
 (0)