Skip to content

Commit cacc20e

Browse files
authored
[Flight] Wait for both streams to end before closing the response (facebook#34301)
When a debug channel is defined, we must ensure that we don't close the Flight Client's response when the debug channel's readable is done, but the RSC stream is still flowing. Now, we wait for both streams to end before closing the response.
1 parent bb7c9c1 commit cacc20e

17 files changed

+514
-149
lines changed

packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ function createResponseFromOptions(options: void | Options) {
101101
function startReadingFromUniversalStream(
102102
response: FlightResponse,
103103
stream: ReadableStream,
104+
onDone: () => void,
104105
): void {
105106
// This is the same as startReadingFromStream except this allows WebSocketStreams which
106107
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@@ -116,8 +117,7 @@ function startReadingFromUniversalStream(
116117
...
117118
}): void | Promise<void> {
118119
if (done) {
119-
close(response);
120-
return;
120+
return onDone();
121121
}
122122
if (value instanceof ArrayBuffer) {
123123
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@@ -139,7 +139,7 @@ function startReadingFromUniversalStream(
139139
function startReadingFromStream(
140140
response: FlightResponse,
141141
stream: ReadableStream,
142-
isSecondaryStream: boolean,
142+
onDone: () => void,
143143
): void {
144144
const streamState = createStreamState();
145145
const reader = stream.getReader();
@@ -152,11 +152,7 @@ function startReadingFromStream(
152152
...
153153
}): void | Promise<void> {
154154
if (done) {
155-
// If we're the secondary stream, then we don't close the response until the debug channel closes.
156-
if (!isSecondaryStream) {
157-
close(response);
158-
}
159-
return;
155+
return onDone();
160156
}
161157
const buffer: Uint8Array = (value: any);
162158
processBinaryChunk(response, streamState, buffer);
@@ -178,10 +174,20 @@ function createFromReadableStream<T>(
178174
options.debugChannel &&
179175
options.debugChannel.readable
180176
) {
181-
startReadingFromUniversalStream(response, options.debugChannel.readable);
182-
startReadingFromStream(response, stream, true);
177+
let streamDoneCount = 0;
178+
const handleDone = () => {
179+
if (++streamDoneCount === 2) {
180+
close(response);
181+
}
182+
};
183+
startReadingFromUniversalStream(
184+
response,
185+
options.debugChannel.readable,
186+
handleDone,
187+
);
188+
startReadingFromStream(response, stream, handleDone);
183189
} else {
184-
startReadingFromStream(response, stream, false);
190+
startReadingFromStream(response, stream, close.bind(null, response));
185191
}
186192
return getRoot(response);
187193
}
@@ -199,13 +205,24 @@ function createFromFetch<T>(
199205
options.debugChannel &&
200206
options.debugChannel.readable
201207
) {
208+
let streamDoneCount = 0;
209+
const handleDone = () => {
210+
if (++streamDoneCount === 2) {
211+
close(response);
212+
}
213+
};
202214
startReadingFromUniversalStream(
203215
response,
204216
options.debugChannel.readable,
217+
handleDone,
205218
);
206-
startReadingFromStream(response, (r.body: any), true);
219+
startReadingFromStream(response, (r.body: any), handleDone);
207220
} else {
208-
startReadingFromStream(response, (r.body: any), false);
221+
startReadingFromStream(
222+
response,
223+
(r.body: any),
224+
close.bind(null, response),
225+
);
209226
}
210227
},
211228
function (e) {

packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export type Options = {
6363
function startReadingFromStream(
6464
response: Response,
6565
stream: Readable,
66-
isSecondaryStream: boolean,
66+
onEnd: () => void,
6767
): void {
6868
const streamState = createStreamState();
6969

@@ -79,13 +79,7 @@ function startReadingFromStream(
7979
reportGlobalError(response, error);
8080
});
8181

82-
stream.on('end', () => {
83-
// If we're the secondary stream, then we don't close the response until the
84-
// debug channel closes.
85-
if (!isSecondaryStream) {
86-
close(response);
87-
}
88-
});
82+
stream.on('end', onEnd);
8983
}
9084

9185
function createFromNodeStream<T>(
@@ -112,10 +106,16 @@ function createFromNodeStream<T>(
112106
);
113107

114108
if (__DEV__ && options && options.debugChannel) {
115-
startReadingFromStream(response, options.debugChannel, false);
116-
startReadingFromStream(response, stream, true);
109+
let streamEndedCount = 0;
110+
const handleEnd = () => {
111+
if (++streamEndedCount === 2) {
112+
close(response);
113+
}
114+
};
115+
startReadingFromStream(response, options.debugChannel, handleEnd);
116+
startReadingFromStream(response, stream, handleEnd);
117117
} else {
118-
startReadingFromStream(response, stream, false);
118+
startReadingFromStream(response, stream, close.bind(null, response));
119119
}
120120

121121
return getRoot(response);

packages/react-server-dom-parcel/src/client/ReactFlightDOMClientBrowser.js

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ function createDebugCallbackFromWritableStream(
102102
function startReadingFromUniversalStream(
103103
response: FlightResponse,
104104
stream: ReadableStream,
105+
onDone: () => void,
105106
): void {
106107
// This is the same as startReadingFromStream except this allows WebSocketStreams which
107108
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@@ -117,8 +118,7 @@ function startReadingFromUniversalStream(
117118
...
118119
}): void | Promise<void> {
119120
if (done) {
120-
close(response);
121-
return;
121+
return onDone();
122122
}
123123
if (value instanceof ArrayBuffer) {
124124
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@@ -140,7 +140,7 @@ function startReadingFromUniversalStream(
140140
function startReadingFromStream(
141141
response: FlightResponse,
142142
stream: ReadableStream,
143-
isSecondaryStream: boolean,
143+
onDone: () => void,
144144
): void {
145145
const streamState = createStreamState();
146146
const reader = stream.getReader();
@@ -153,11 +153,7 @@ function startReadingFromStream(
153153
...
154154
}): void | Promise<void> {
155155
if (done) {
156-
// If we're the secondary stream, then we don't close the response until the debug channel closes.
157-
if (!isSecondaryStream) {
158-
close(response);
159-
}
160-
return;
156+
return onDone();
161157
}
162158
const buffer: Uint8Array = (value: any);
163159
processBinaryChunk(response, streamState, buffer);
@@ -208,10 +204,20 @@ export function createFromReadableStream<T>(
208204
options.debugChannel &&
209205
options.debugChannel.readable
210206
) {
211-
startReadingFromUniversalStream(response, options.debugChannel.readable);
212-
startReadingFromStream(response, stream, true);
207+
let streamDoneCount = 0;
208+
const handleDone = () => {
209+
if (++streamDoneCount === 2) {
210+
close(response);
211+
}
212+
};
213+
startReadingFromUniversalStream(
214+
response,
215+
options.debugChannel.readable,
216+
handleDone,
217+
);
218+
startReadingFromStream(response, stream, handleDone);
213219
} else {
214-
startReadingFromStream(response, stream, false);
220+
startReadingFromStream(response, stream, close.bind(null, response));
215221
}
216222
return getRoot(response);
217223
}
@@ -250,13 +256,24 @@ export function createFromFetch<T>(
250256
options.debugChannel &&
251257
options.debugChannel.readable
252258
) {
259+
let streamDoneCount = 0;
260+
const handleDone = () => {
261+
if (++streamDoneCount === 2) {
262+
close(response);
263+
}
264+
};
253265
startReadingFromUniversalStream(
254266
response,
255267
options.debugChannel.readable,
268+
handleDone,
256269
);
257-
startReadingFromStream(response, (r.body: any), true);
270+
startReadingFromStream(response, (r.body: any), handleDone);
258271
} else {
259-
startReadingFromStream(response, (r.body: any), false);
272+
startReadingFromStream(
273+
response,
274+
(r.body: any),
275+
close.bind(null, response),
276+
);
260277
}
261278
},
262279
function (e) {

packages/react-server-dom-parcel/src/client/ReactFlightDOMClientEdge.js

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ function createResponseFromOptions(options?: Options) {
102102
function startReadingFromStream(
103103
response: FlightResponse,
104104
stream: ReadableStream,
105-
isSecondaryStream: boolean,
105+
onDone: () => void,
106106
): void {
107107
const streamState = createStreamState();
108108
const reader = stream.getReader();
@@ -115,12 +115,7 @@ function startReadingFromStream(
115115
...
116116
}): void | Promise<void> {
117117
if (done) {
118-
// If we're the secondary stream, then we don't close the response until
119-
// the debug channel closes.
120-
if (!isSecondaryStream) {
121-
close(response);
122-
}
123-
return;
118+
return onDone();
124119
}
125120
const buffer: Uint8Array = (value: any);
126121
processBinaryChunk(response, streamState, buffer);
@@ -144,10 +139,16 @@ export function createFromReadableStream<T>(
144139
options.debugChannel &&
145140
options.debugChannel.readable
146141
) {
147-
startReadingFromStream(response, options.debugChannel.readable, false);
148-
startReadingFromStream(response, stream, true);
142+
let streamDoneCount = 0;
143+
const handleDone = () => {
144+
if (++streamDoneCount === 2) {
145+
close(response);
146+
}
147+
};
148+
startReadingFromStream(response, options.debugChannel.readable, handleDone);
149+
startReadingFromStream(response, stream, handleDone);
149150
} else {
150-
startReadingFromStream(response, stream, false);
151+
startReadingFromStream(response, stream, close.bind(null, response));
151152
}
152153

153154
return getRoot(response);
@@ -166,10 +167,24 @@ export function createFromFetch<T>(
166167
options.debugChannel &&
167168
options.debugChannel.readable
168169
) {
169-
startReadingFromStream(response, options.debugChannel.readable, false);
170-
startReadingFromStream(response, (r.body: any), true);
170+
let streamDoneCount = 0;
171+
const handleDone = () => {
172+
if (++streamDoneCount === 2) {
173+
close(response);
174+
}
175+
};
176+
startReadingFromStream(
177+
response,
178+
options.debugChannel.readable,
179+
handleDone,
180+
);
181+
startReadingFromStream(response, (r.body: any), handleDone);
171182
} else {
172-
startReadingFromStream(response, (r.body: any), false);
183+
startReadingFromStream(
184+
response,
185+
(r.body: any),
186+
close.bind(null, response),
187+
);
173188
}
174189
},
175190
function (e) {

packages/react-server-dom-parcel/src/client/ReactFlightDOMClientNode.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export type Options = {
5959
function startReadingFromStream(
6060
response: Response,
6161
stream: Readable,
62-
isSecondaryStream: boolean,
62+
onEnd: () => void,
6363
): void {
6464
const streamState = createStreamState();
6565

@@ -75,13 +75,7 @@ function startReadingFromStream(
7575
reportGlobalError(response, error);
7676
});
7777

78-
stream.on('end', () => {
79-
// If we're the secondary stream, then we don't close the response until the
80-
// debug channel closes.
81-
if (!isSecondaryStream) {
82-
close(response);
83-
}
84-
});
78+
stream.on('end', onEnd);
8579
}
8680

8781
export function createFromNodeStream<T>(
@@ -104,10 +98,16 @@ export function createFromNodeStream<T>(
10498
);
10599

106100
if (__DEV__ && options && options.debugChannel) {
107-
startReadingFromStream(response, options.debugChannel, false);
108-
startReadingFromStream(response, stream, true);
101+
let streamEndedCount = 0;
102+
const handleEnd = () => {
103+
if (++streamEndedCount === 2) {
104+
close(response);
105+
}
106+
};
107+
startReadingFromStream(response, options.debugChannel, handleEnd);
108+
startReadingFromStream(response, stream, handleEnd);
109109
} else {
110-
startReadingFromStream(response, stream, false);
110+
startReadingFromStream(response, stream, close.bind(null, response));
111111
}
112112

113113
return getRoot(response);

0 commit comments

Comments
 (0)