-
Notifications
You must be signed in to change notification settings - Fork 167
Description
What is the issue with the Streams Standard?
I'm investigating an oddity and wondered if it was a node issue, but it seems (unless I'm doing something wrong) it's in the reference implementation too.
I'm testing the scenario where you put the stream completely into pull-only mode, by making everything have a hightwaterMark of 0, EXCEPT the ultimate sink. The sink (or reader) initiates the pulling to get the data flowing 1 item at a time.
This works for ReadableStream, but not through the TransformStream. For transformStream I have to make the readableStrategy 1, which presumably acts as a sink for the upstream source and allows it to flow.
Sure - this isn't how you want to use streams, but I find it odd it doesn't seem to work in pure pull mode and TransformStreams have to buffer at least 1 item.
So my question is, does the standard allow for pure pull (from sink) to pull through TransformStreams?
My test that shows it hangs trying to do this:
globalThis.window = globalThis;
globalThis.gc = global.gc
delete globalThis.TransformStream
try{ new TransformStream(); } catch(err){
console.log("deleted node's TransformStream");
}
require("./reference/reference-implementation/lib/index.js")
let timer;
async function main() {
let strategy = new CountQueuingStrategy({ highWaterMark: 0 });
let srcGen = () => {
let counter = 0;
return new ReadableStream({
pull(controller) {
controller.enqueue(counter++);
if (counter > 3)
controller.close();
}
}, strategy);
};
// ReadableStream only 0-buffer
let result1 = [];
let src1 = srcGen();
let reader1 = src1.getReader();
while (true) {
let { value, done } = await reader1.read();
if (done) break; result1.push(value);
}
console.log("src1 results", result1);
// ReadableStream > Transform 0-buffer
let src2 = srcGen().pipeThrough(new TransformStream({}, strategy /* set to {highwaterMark:1} and it works */, strategy));
let result2 = [];
let reader2 = src2.getReader();
while (true) {
let { value, done } = await reader2.read();
if (done) break;
result2.push(value);
}
console.log("src2 results", result2);
if (timer) clearTimeout(timer);
}
timer = setTimeout(() => {
console.log("timed out");
process.exit(1);
}, 3000)
main().catch(console.error);
> node check.cjs --expose-gc
deleted node's TransformStream
src1 results [ 0, 1, 2, 3 ]
timed out
So the other case that does make sense is a WritableStream with 0 highWaterMark.
new ReadableStream({...}, {highWaterMark: 0})
.pipeTo(new WritableStream({}, {highWaterMark: 0}))
This makes sense as to why nothing happens, as without the buffer to initiate pulling in the Writable, nothing will happen.