Skip to content

Commit ff5595c

Browse files
committed
Fixes to the new streams adapters
1 parent 2e1cccc commit ff5595c

File tree

1 file changed

+39
-5
lines changed

1 file changed

+39
-5
lines changed

src/workerd/api/streams/readable-source-adapter.c++

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -755,8 +755,10 @@ jsg::Promise<kj::Own<ReadableSourceKjAdapter::ReadContext>> ReadableSourceKjAdap
755755
// At this point, we should have no left over data.
756756
KJ_DASSERT(context->maybeLeftOver == kj::none);
757757

758-
// We should also have some space left in our destination buffer.
759-
KJ_DASSERT(context->buffer.size() > 0);
758+
// If the buffer is exactly full (the chunk filled it perfectly), we're done.
759+
if (context->buffer.size() == 0) {
760+
return js.resolvedPromise(kj::mv(context));
761+
}
760762

761763
// We might continue reading only if the adapter is still alive and
762764
// in an active state...
@@ -1111,13 +1113,28 @@ kj::Promise<void> ReadableSourceKjAdapter::pumpToImpl(
11111113
.adapterRef = kj::none, // no need to track adapter liveness during pump
11121114
});
11131115

1114-
return active.ioContext.run([context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable {
1116+
// We need to pass a pointer to active into the promise continuation so we can
1117+
// save any leftover data. The caller (pumpToImpl) owns the Active and keeps it
1118+
// alive for the duration of the pump, so this is safe.
1119+
Active* activePtr = &active;
1120+
1121+
return active.ioContext.run(
1122+
[activePtr, context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable {
11151123
auto& ioContext = IoContext::current();
11161124
// The readInternal method (and the underlying read on the stream) should optimize
11171125
// itself based on the bytes available in the stream itself and the minBytes requested.
11181126
return ioContext
11191127
.awaitJs(js, ReadableSourceKjAdapter::readInternal(js, kj::mv(context), minReadPolicy))
1120-
.then([](kj::Own<ReadContext> context) mutable -> kj::Promise<size_t> {
1128+
.then([activePtr](kj::Own<ReadContext> context) mutable -> kj::Promise<size_t> {
1129+
// If there's leftover data from reading a chunk larger than the buffer,
1130+
// save it to active.state so it can be used on the next read iteration.
1131+
// Only do this if we're still in Idle state - if the state has transitioned
1132+
// to something else (e.g. Done, Canceling, Canceled), we discard the leftover.
1133+
KJ_IF_SOME(leftOver, context->maybeLeftOver) {
1134+
if (activePtr->state.is<Active::Idle>()) {
1135+
activePtr->state = kj::mv(leftOver);
1136+
}
1137+
}
11211138
return context->totalRead;
11221139
});
11231140
});
@@ -1238,12 +1255,29 @@ kj::Promise<void> ReadableSourceKjAdapter::pumpToImpl(
12381255
consecutiveFastReads = 0;
12391256
}
12401257

1241-
// Start working on the next read.
1258+
// If there's leftover data from the previous read (happens when a JS chunk
1259+
// is larger than the buffer), extract it before starting the next read.
1260+
// We must do this BEFORE starting the next read so that active->state is Idle
1261+
// when the next read's promise continuation tries to save its leftover.
1262+
kj::Maybe<Active::Readable> maybeLeftover;
1263+
KJ_IF_SOME(readable, active->state.tryGet<Active::Readable>()) {
1264+
maybeLeftover = kj::mv(readable);
1265+
}
1266+
1267+
// Start working on the next read. At this point, if there was leftover, we've
1268+
// moved it to maybeLeftover, so the next read can safely set its leftover
1269+
// to active->state when it completes.
1270+
active->state.init<Active::Idle>();
12421271
readPromise = pumpReadImpl(*active, buffers[currentReadBuf], minBytes, minReadPolicy);
12431272

12441273
{
12451274
KJ_ON_SCOPE_FAILURE(writeFailed = true);
12461275
co_await output.write(writeBuf);
1276+
1277+
// Write any leftover from the previous read.
1278+
KJ_IF_SOME(leftover, maybeLeftover) {
1279+
co_await output.write(leftover.view);
1280+
}
12471281
}
12481282
}
12491283
} catch (...) {

0 commit comments

Comments
 (0)