Skip to content

Commit 92f34fd

Browse files
committed
transformWithCancel: fix race condition in cancellation (#43)
Relevant for e.g.passiveClones.
1 parent 0ce871e commit 92f34fd

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

lib/streams.js

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,10 @@ function transformRaw(input, options) {
165165
* @param {Function} cancel
166166
* @returns {TransformStream}
167167
*/
168-
function transformWithCancel(cancel) {
168+
function transformWithCancel(customCancel) {
169169
let pulled = false;
170-
let backpressureChangePromiseResolve;
170+
let cancelled = false;
171+
let backpressureChangePromiseResolve, backpressureChangePromiseReject;
171172
let outputController;
172173
return {
173174
readable: new ReadableStream({
@@ -181,16 +182,29 @@ function transformWithCancel(cancel) {
181182
pulled = true;
182183
}
183184
},
184-
cancel
185+
async cancel(reason) {
186+
cancelled = true;
187+
if (customCancel) {
188+
await customCancel(reason);
189+
}
190+
if (backpressureChangePromiseReject) {
191+
backpressureChangePromiseReject(reason);
192+
}
193+
}
185194
}, {highWaterMark: 0}),
186195
writable: new WritableStream({
187196
write: async function(chunk) {
197+
if (cancelled) {
198+
throw new Error('Stream is cancelled');
199+
}
188200
outputController.enqueue(chunk);
189201
if (!pulled) {
190-
await new Promise(resolve => {
202+
await new Promise((resolve, reject) => {
191203
backpressureChangePromiseResolve = resolve;
204+
backpressureChangePromiseReject = reject;
192205
});
193206
backpressureChangePromiseResolve = null;
207+
backpressureChangePromiseReject = null;
194208
} else {
195209
pulled = false;
196210
}

0 commit comments

Comments
 (0)