Skip to content

Commit a65ff5b

Browse files
authored
fix: enhance stream handling with safe enqueue and close mechanisms (#7894)
1 parent 9de9720 commit a65ff5b

File tree

1 file changed

+73
-38
lines changed

1 file changed

+73
-38
lines changed

packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,45 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement =
6868
const stream = new ReadableStream({
6969
start(controller) {
7070
const pendingScripts: string[] = [];
71+
let isClosed = false;
72+
73+
const safeEnqueue = (chunk: Uint8Array | unknown) => {
74+
if (isClosed) return;
75+
try {
76+
controller.enqueue(chunk as Uint8Array);
77+
} catch {
78+
isClosed = true;
79+
}
80+
};
81+
82+
const closeController = () => {
83+
if (!isClosed) {
84+
isClosed = true;
85+
try {
86+
controller.close();
87+
} catch {
88+
// Controller already closed
89+
}
90+
}
91+
};
92+
93+
const flushPendingScripts = () => {
94+
for (const s of pendingScripts) {
95+
safeEnqueue(encodeForWebStream(s));
96+
}
97+
pendingScripts.length = 0;
98+
};
99+
71100
const enqueueScript = (script: string) => {
72101
if (shellChunkStatus === ShellChunkStatus.FINISH) {
73-
controller.enqueue(encodeForWebStream(script));
102+
safeEnqueue(encodeForWebStream(script));
74103
} else {
75104
pendingScripts.push(script);
76105
}
77106
};
78107

79108
const storageContext = storage.useContext?.();
80109
const activeDeferreds = storageContext?.activeDeferreds;
81-
82110
/**
83111
* activeDeferreds is injected into storageContext by @modern-js/runtime.
84112
* @see packages/toolkit/runtime-utils/src/browser/nestedRoutes.tsx
@@ -89,51 +117,58 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement =
89117
: [];
90118

91119
if (entries.length > 0) {
92-
enqueueFromEntries(entries, config.nonce, (s: string) =>
93-
enqueueScript(s),
94-
);
120+
enqueueFromEntries(entries, config.nonce, enqueueScript);
95121
}
96122

97123
async function push() {
98-
const { done, value } = await reader.read();
99-
if (done) {
100-
controller.close();
101-
return;
102-
}
103-
if (shellChunkStatus !== ShellChunkStatus.FINISH) {
104-
const chunk = new TextDecoder().decode(value);
105-
106-
chunkVec.push(chunk);
107-
108-
let concatedChunk = chunkVec.join('');
109-
if (concatedChunk.includes(ESCAPED_SHELL_STREAM_END_MARK)) {
110-
concatedChunk = concatedChunk.replace(
111-
ESCAPED_SHELL_STREAM_END_MARK,
112-
'',
113-
);
114-
115-
shellChunkStatus = ShellChunkStatus.FINISH;
116-
117-
controller.enqueue(
118-
encodeForWebStream(
119-
`${shellBefore}${concatedChunk}${shellAfter}`,
120-
),
121-
);
122-
// Flush any pending <script> collected before shell finished
123-
if (pendingScripts.length > 0) {
124-
for (const s of pendingScripts) {
125-
controller.enqueue(encodeForWebStream(s));
126-
}
127-
pendingScripts.length = 0;
124+
try {
125+
const { done, value } = await reader.read();
126+
if (done) {
127+
closeController();
128+
return;
129+
}
130+
131+
if (isClosed) return;
132+
133+
if (shellChunkStatus !== ShellChunkStatus.FINISH) {
134+
chunkVec.push(new TextDecoder().decode(value));
135+
const concatedChunk = chunkVec.join('');
136+
137+
if (concatedChunk.includes(ESCAPED_SHELL_STREAM_END_MARK)) {
138+
shellChunkStatus = ShellChunkStatus.FINISH;
139+
safeEnqueue(
140+
encodeForWebStream(
141+
`${shellBefore}${concatedChunk.replace(
142+
ESCAPED_SHELL_STREAM_END_MARK,
143+
'',
144+
)}${shellAfter}`,
145+
),
146+
);
147+
flushPendingScripts();
148+
}
149+
} else {
150+
safeEnqueue(value);
151+
}
152+
153+
if (!isClosed) push();
154+
} catch (error) {
155+
if (!isClosed) {
156+
isClosed = true;
157+
try {
158+
controller.error(error);
159+
} catch {
160+
// Controller already closed
128161
}
129162
}
130-
} else {
131-
controller.enqueue(value);
132163
}
133-
push();
134164
}
135165
push();
136166
},
167+
cancel(reason) {
168+
reader.cancel(reason).catch(() => {
169+
// Ignore cancellation errors
170+
});
171+
},
137172
});
138173
return stream;
139174
} catch (e) {

0 commit comments

Comments
 (0)