Skip to content

Commit ea03a24

Browse files
Fix tee() incorrectly closing before enqueuing to the second branch
1 parent 02de71a commit ea03a24

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

index.bs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,6 +2241,7 @@ create them does not matter.
22412241
1. Assert: |cloneForBranch2| is a boolean.
22422242
1. Let |reader| be ? [$AcquireReadableStreamDefaultReader$](|stream|).
22432243
1. Let |reading| be false.
2244+
1. Let |readAgain| be false.
22442245
1. Let |canceled1| be false.
22452246
1. Let |canceled2| be false.
22462247
1. Let |reason1| be undefined.
@@ -2249,13 +2250,15 @@ create them does not matter.
22492250
1. Let |branch2| be undefined.
22502251
1. Let |cancelPromise| be [=a new promise=].
22512252
1. Let |pullAlgorithm| be the following steps:
2252-
1. If |reading| is true, return [=a promise resolved with=] undefined.
2253+
1. If |reading| is true,
2254+
1. Set |readAgain| to true.
2255+
1. Return [=a promise resolved with=] undefined.
22532256
1. Set |reading| to true.
22542257
1. Let |readRequest| be a [=read request=] with the following [=struct/items=]:
22552258
: [=read request/chunk steps=], given |chunk|
22562259
::
22572260
1. [=Queue a microtask=] to perform the following steps:
2258-
1. Set |reading| to false.
2261+
1. Set |readAgain| to false.
22592262
1. Let |chunk1| and |chunk2| be |chunk|.
22602263
1. If |canceled2| is false and |cloneForBranch2| is true,
22612264
1. Let |cloneResult| be [$StructuredClone$](|chunk2|).
@@ -2271,6 +2274,8 @@ create them does not matter.
22712274
1. If |canceled2| is false, perform !
22722275
[$ReadableStreamDefaultControllerEnqueue$](|branch2|.[=ReadableStream/[[controller]]=],
22732276
|chunk2|).
2277+
1. Set |reading| to false.
2278+
1. If |readAgain| is true, perform |pullAlgorithm|.
22742279

22752280
<p class="note">The microtask delay here is necessary because it takes at least a microtask to
22762281
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
@@ -2331,6 +2336,8 @@ create them does not matter.
23312336
{{ReadableByteStreamController}}.
23322337
1. Let |reader| be ? [$AcquireReadableStreamDefaultReader$](|stream|).
23332338
1. Let |reading| be false.
2339+
1. Let |readAgainForBranch1| be false.
2340+
1. Let |readAgainForBranch2| be false.
23342341
1. Let |canceled1| be false.
23352342
1. Let |canceled2| be false.
23362343
1. Let |reason1| be undefined.
@@ -2357,7 +2364,8 @@ create them does not matter.
23572364
: [=read request/chunk steps=], given |chunk|
23582365
::
23592366
1. [=Queue a microtask=] to perform the following steps:
2360-
1. Set |reading| to false.
2367+
1. Set |readAgainForBranch1| to false.
2368+
1. Set |readAgainForBranch2| to false.
23612369
1. Let |chunk1| and |chunk2| be |chunk|.
23622370
1. If |canceled1| is false and |canceled2| is false,
23632371
1. Let |cloneResult| be [$CloneAsUint8Array$](|chunk|).
@@ -2373,6 +2381,9 @@ create them does not matter.
23732381
1. If |canceled2| is false, perform !
23742382
[$ReadableByteStreamControllerEnqueue$](|branch2|.[=ReadableStream/[[controller]]=],
23752383
|chunk2|).
2384+
1. Set |reading| to false.
2385+
1. If |readAgainForBranch1| is true, perform |pull1Algorithm|.
2386+
1. Otherwise, if |readAgainForBranch2| is true, perform |pull2Algorithm|.
23762387

23772388
<p class="note">The microtask delay here is necessary because it takes at least a microtask to
23782389
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
@@ -2410,7 +2421,8 @@ create them does not matter.
24102421
: [=read-into request/chunk steps=], given |chunk|
24112422
::
24122423
1. [=Queue a microtask=] to perform the following steps:
2413-
1. Set |reading| to false.
2424+
1. Set |readAgainForBranch1| to false.
2425+
1. Set |readAgainForBranch2| to false.
24142426
1. Let |byobCanceled| be |canceled2| if |forBranch2| is true, and |canceled1| otherwise.
24152427
1. Let |otherCanceled| be |canceled2| if |forBranch2| is false, and |canceled1| otherwise.
24162428
1. If |otherCanceled| is false,
@@ -2429,6 +2441,9 @@ create them does not matter.
24292441
1. Otherwise, if |byobCanceled| is false, perform !
24302442
[$ReadableByteStreamControllerRespondWithNewView$](|byobBranch|.[=ReadableStream/[[controller]]=],
24312443
|chunk|).
2444+
1. Set |reading| to false.
2445+
1. If |readAgainForBranch1| is true, perform |pull1Algorithm|.
2446+
1. Otherwise, if |readAgainForBranch2| is true, perform |pull2Algorithm|.
24322447

24332448
<p class="note">The microtask delay here is necessary because it takes at least a microtask to
24342449
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
@@ -2460,14 +2475,18 @@ create them does not matter.
24602475
1. Set |reading| to false.
24612476
1. Perform ! [$ReadableStreamBYOBReaderRead$](|reader|, |view|, |readIntoRequest|).
24622477
1. Let |pull1Algorithm| be the following steps:
2463-
1. If |reading| is true, return [=a promise resolved with=] undefined.
2478+
1. If |reading| is true,
2479+
1. Set |readAgainForBranch1| to true.
2480+
1. Return [=a promise resolved with=] undefined.
24642481
1. Set |reading| to true.
24652482
1. Let |byobRequest| be ! [$ReadableByteStreamControllerGetBYOBRequest$](|branch1|.[=ReadableStream/[[controller]]=]).
24662483
1. If |byobRequest| is null, perform |pullWithDefaultReader|.
24672484
1. Otherwise, perform |pullWithBYOBReader|, given |byobRequest|.[=ReadableStreamBYOBRequest/[[view]]=] and false.
24682485
1. Return [=a promise resolved with=] undefined.
24692486
1. Let |pull2Algorithm| be the following steps:
2470-
1. If |reading| is true, return [=a promise resolved with=] undefined.
2487+
1. If |reading| is true,
2488+
1. Set |readAgainForBranch2| to true.
2489+
1. Return [=a promise resolved with=] undefined.
24712490
1. Set |reading| to true.
24722491
1. Let |byobRequest| be ! [$ReadableByteStreamControllerGetBYOBRequest$](|branch2|.[=ReadableStream/[[controller]]=]).
24732492
1. If |byobRequest| is null, perform |pullWithDefaultReader|.

reference-implementation/lib/abstract-ops/readable-streams.js

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
348348
const reader = AcquireReadableStreamDefaultReader(stream);
349349

350350
let reading = false;
351+
let readAgain = false;
351352
let canceled1 = false;
352353
let canceled2 = false;
353354
let reason1;
@@ -359,6 +360,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
359360

360361
function pullAlgorithm() {
361362
if (reading === true) {
363+
readAgain = true;
362364
return promiseResolvedWith(undefined);
363365
}
364366

@@ -370,7 +372,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
370372
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
371373
// successful synchronously-available reads get ahead of asynchronously-available errors.
372374
queueMicrotask(() => {
373-
reading = false;
375+
readAgain = false;
374376
const chunk1 = chunk;
375377
const chunk2 = chunk;
376378

@@ -390,10 +392,14 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
390392
if (canceled1 === false) {
391393
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
392394
}
393-
394395
if (canceled2 === false) {
395396
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
396397
}
398+
399+
reading = false;
400+
if (readAgain === true) {
401+
pullAlgorithm();
402+
}
397403
});
398404
},
399405
closeSteps: () => {
@@ -460,6 +466,8 @@ function ReadableByteStreamTee(stream) {
460466
assert(ReadableByteStreamController.isImpl(stream._controller));
461467

462468
let reader = AcquireReadableStreamDefaultReader(stream);
469+
let readAgainForBranch1 = false;
470+
let readAgainForBranch2 = false;
463471
let reading = false;
464472
let canceled1 = false;
465473
let canceled2 = false;
@@ -498,7 +506,8 @@ function ReadableByteStreamTee(stream) {
498506
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
499507
// successful synchronously-available reads get ahead of asynchronously-available errors.
500508
queueMicrotask(() => {
501-
reading = false;
509+
readAgainForBranch1 = false;
510+
readAgainForBranch2 = false;
502511

503512
const chunk1 = chunk;
504513
let chunk2 = chunk;
@@ -519,6 +528,13 @@ function ReadableByteStreamTee(stream) {
519528
if (canceled2 === false) {
520529
ReadableByteStreamControllerEnqueue(branch2._controller, chunk2);
521530
}
531+
532+
reading = false;
533+
if (readAgainForBranch1 === true) {
534+
pull1Algorithm();
535+
} else if (readAgainForBranch2 === true) {
536+
pull2Algorithm();
537+
}
522538
});
523539
},
524540
closeSteps: () => {
@@ -564,7 +580,8 @@ function ReadableByteStreamTee(stream) {
564580
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
565581
// successful synchronously-available reads get ahead of asynchronously-available errors.
566582
queueMicrotask(() => {
567-
reading = false;
583+
readAgainForBranch1 = false;
584+
readAgainForBranch2 = false;
568585

569586
const byobCanceled = forBranch2 ? canceled2 : canceled1;
570587
const otherCanceled = forBranch2 ? canceled1 : canceled2;
@@ -586,6 +603,13 @@ function ReadableByteStreamTee(stream) {
586603
} else if (byobCanceled === false) {
587604
ReadableByteStreamControllerRespondWithNewView(byobBranch._controller, chunk);
588605
}
606+
607+
reading = false;
608+
if (readAgainForBranch1 === true) {
609+
pull1Algorithm();
610+
} else if (readAgainForBranch2 === true) {
611+
pull2Algorithm();
612+
}
589613
});
590614
},
591615
closeSteps: chunk => {
@@ -625,6 +649,7 @@ function ReadableByteStreamTee(stream) {
625649

626650
function pull1Algorithm() {
627651
if (reading === true) {
652+
readAgainForBranch1 = true;
628653
return promiseResolvedWith(undefined);
629654
}
630655

@@ -642,6 +667,7 @@ function ReadableByteStreamTee(stream) {
642667

643668
function pull2Algorithm() {
644669
if (reading === true) {
670+
readAgainForBranch2 = true;
645671
return promiseResolvedWith(undefined);
646672
}
647673

0 commit comments

Comments
 (0)