-
Notifications
You must be signed in to change notification settings - Fork 15
Eliminate buffering in stream.transform()
#60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
lib/streams.js
Outdated
const result = await (done ? finish : process)(value); | ||
if (result !== undefined) { | ||
controller.enqueue(result); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why returning here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To respect backpressure, after we enqueue a chunk, we need to wait until pull
gets called again before enqueueing another (or closing the stream).
With a highWaterMark
of 0, pull
essentially gets called for every call to read
, so we need to respond to it with a single call to either enqueue
or close
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments in the code about this? (but actually, isn't it nicer to access controller.desiredSize
and return based on that; this way you could also drop the the allDone
logic?)
Also, I'm not sure that this non-buffering behavior is always desirable, as it affects performance? Let's have the caller request it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments in the code about this?
I can add some comments but most of this is also documented on https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream.
(but actually, isn't it nicer to access controller.desiredSize and return based on that; this way you could also drop the the allDone logic?)
No, it doesn't work like that, desiredSize
is always 0 when highWaterMark
is 0; it's just that pull
gets called when read
gets called. And after enqueueing, pull
will always be called again if more data is desired (regardless of what we do here), so if we enqueue multiple chunks from a single call to pull
, we'll be enqueueing more than desired.
controller.desiredSize
is mainly useful when highWatermark
> 0, and particularly when using a ByteLengthQueuingStrategy
, to know how many bytes to enqueue, but we don't use that for now.
Also, I'm not sure that this non-buffering behavior is always desirable, as it affects performance? Let's have the caller request it ?
A non-zero highWaterMark
is only useful when the transform function is asynchronous, as it enables multiple transformations to run in parallel. OpenPGP.js doesn't have any cases of that; the only async transformation (for AEAD) creates a TransformStream
with a highWaterMark
manually. We could still add an option for the queueing strategy, but IMHO the default should be non-buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic I have in mind is this (based on this):
async pull(controller) {
try {
while (true) {
const { value, done } = await reader.read();
const result = await (done ? finish : process)(value);
let wasDataEnqueued = false;
if (result !== undefined) {
controller.enqueue(result);
wasDataEnqueued = true;
}
if (done) {
controller.close();
input.releaseLock();
return;
}
if (wasDataEnqueued && controller.desiredSize <= 0) {
return;
}
}
} catch (e) {
controller.error(e);
}
},
A non-zero highWaterMark is only useful when the transform function is asynchronous, as it enables multiple transformations to run in parallel.
Even if I don't run them in parallel, I can benefit from having data processed (and buffered) before I actually go and read the output stream, so I don't have to wait then, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic I have in mind is this (based on this):
In that example, the highWaterMark
is 1 (the default). In that case, desiredSize
will be either 1 or 0. In our case, highWaterMark
is 0 so desiredSize
is always 0 as well. Therefore, there's no point looking at it.
Even if I don't run them in parallel, I can benefit from having data processed (and buffered) before I actually go and read the output stream, so I don't have to wait then, no?
Well - JavaScript is single-threaded. If we start producing the data before you call read
, the code that calls read
will be delayed until the data is produced, so no matter the order you'll get the data at the same time. The only difference is if you're doing something asynchronous and then calling read
, but then the solution is just to call read
and the other thing in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that example, the highWaterMark is 1 (the default). In that case, desiredSize will be either 1 or 0. In our case, highWaterMark is 0 so desiredSize is always 0 as well. Therefore, there's no point looking at it.
Aren't we now accepting a custom queueing strategy? Also, the more general point for me is to avoid keeping the object-scoped allDone
around.
The only difference is if you're doing something asynchronous and then calling read, but then the solution is just to call read and the other thing in parallel.
We do a lot of async stuff in openpgpjs. I think we should default to the default highWaterMark (generally the default queuing strategy) that comes with ReadableStreams and then explicitly set it to 0 when we have a clear motivation for that (which we do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we now accepting a custom queueing strategy? Also, the more general point for me is to avoid keeping the object-scoped allDone around.
The code as-is works with any queuing strategy. The proposed code doesn't work properly because after calling enqueue
, pull
is called again immediately, even if we call controller.close()
afterwards. This means that we'll make another call to read
and then finish
. That's why allDone
is needed.
We do a lot of async stuff in openpgpjs. I think we should default to the default highWaterMark (generally the default queuing strategy) that comes with ReadableStreams and then explicitly set it to 0 when we have a clear motivation for that (which we do).
We do do async stuff in OpenPGP.js, but not in the process
function passed to transform
here. So, defaulting to highWaterMark: 1
doesn't make much sense as we'll need to pass highWaterMark: 0
literally everywhere, and the same is probably true for other callers of web-stream-tools as synchronous transformation functions are much more common than asynchronous ones.
The default for ReadableStream
is highWaterMark: 1
because it's assumed that there's some underlying source that's asynchronous, but for TransformStream
s it's not the case (the default strategy for the readable side is HWM=0, and there's a proposal to make it HWM=0 for the writable side too: whatwg/streams#1190 (comment)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposed code doesn't work properly because after calling enqueue, pull is called again immediately, even if we call controller.close() afterwards.
Ah I see. Then for future consideration: I was thinking it'd be neat to have an abstraction similar to the whatwg example , to wrap the input and get an API similar to the createBackpressureSocket
one.
We do do async stuff in OpenPGP.js, but not in the process function passed to transform here.
We do use async transforms for e.g. AES-CFB; even if we didn't, my gripe it's that this is a general purpose lib atm, and we do accept an async process
, so it seems to me the defaults should be in line with that... but actually, since I just recalled that stream.transform
isn't even documented to accept the async callbacks, we could expose a separate transformAsync
(would make [future] TS happy too). Let's do that? For now,transformAsync
can just call transform
with a highWaterMark = 1 (as default). Then when we move this lib inside openpgpjs (or we bump (major?) version here), we can clear things up and drop async support in stream.trasform
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do use async transforms for e.g. AES-CFB
Ah right, I missed that one.
since I just recalled that stream.transform isn't even documented to accept the async callbacks
Besides not being documented, it actually also doesn't work properly in all cases, namely when input
is not a stream, since process
and finish
aren't await
ed in that case. So, I agree we should make a separate transformAsync
, to fix that, and drop support for async transforms in transform
.
I'll do that in a separate PR though, since it's a more significantly breaking change than we're making here.
25e9bca
to
266fe46
Compare
A `TransformStream` currently always requires an internal queue (until whatwg/streams#1158 is resolved). Therefore, don't use `TransformStream` in `transform` anymore, but create a new `ReadableStream` with the transformed chunks directly, and with a `highWaterMark` of 0, such that the internal queue is always empty.
266fe46
to
1788559
Compare
A
TransformStream
currently always requires an internal queue (until whatwg/streams#1158 is resolved).Therefore, don't use
TransformStream
instream.transform()
, but create a newReadableStream
with the transformed chunks directly, and with ahighWaterMark
of 0, such that the internal queue is always empty.