Skip to content

Commit 6fe4bd8

Browse files
authored
fix: enhance stream handling with safe enqueue and close mechanisms (#7893)
1 parent d8903e1 commit 6fe4bd8

File tree

1 file changed

+73
-37
lines changed

1 file changed

+73
-37
lines changed

packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,38 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement =
6868
const stream = new ReadableStream({
6969
start(controller) {
7070
const pendingScripts: string[] = [];
71+
let isClosed = false;
72+
73+
const safeEnqueue = (chunk: Uint8Array | unknown) => {
74+
if (isClosed) return;
75+
try {
76+
controller.enqueue(chunk as Uint8Array);
77+
} catch {
78+
isClosed = true;
79+
}
80+
};
81+
82+
const closeController = () => {
83+
if (!isClosed) {
84+
isClosed = true;
85+
try {
86+
controller.close();
87+
} catch {
88+
// Controller already closed
89+
}
90+
}
91+
};
92+
93+
const flushPendingScripts = () => {
94+
for (const s of pendingScripts) {
95+
safeEnqueue(encodeForWebStream(s));
96+
}
97+
pendingScripts.length = 0;
98+
};
99+
71100
const enqueueScript = (script: string) => {
72101
if (shellChunkStatus === ShellChunkStatus.FINISH) {
73-
controller.enqueue(encodeForWebStream(script));
102+
safeEnqueue(encodeForWebStream(script));
74103
} else {
75104
pendingScripts.push(script);
76105
}
@@ -93,51 +122,58 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement =
93122
: [];
94123

95124
if (entries.length > 0) {
96-
enqueueFromEntries(entries, config.nonce, (s: string) =>
97-
enqueueScript(s),
98-
);
125+
enqueueFromEntries(entries, config.nonce, enqueueScript);
99126
}
100127

101128
async function push() {
102-
const { done, value } = await reader.read();
103-
if (done) {
104-
controller.close();
105-
return;
106-
}
107-
if (shellChunkStatus !== ShellChunkStatus.FINISH) {
108-
const chunk = new TextDecoder().decode(value);
109-
110-
chunkVec.push(chunk);
111-
112-
let concatedChunk = chunkVec.join('');
113-
if (concatedChunk.includes(ESCAPED_SHELL_STREAM_END_MARK)) {
114-
concatedChunk = concatedChunk.replace(
115-
ESCAPED_SHELL_STREAM_END_MARK,
116-
'',
117-
);
118-
119-
shellChunkStatus = ShellChunkStatus.FINISH;
120-
121-
controller.enqueue(
122-
encodeForWebStream(
123-
`${shellBefore}${concatedChunk}${shellAfter}`,
124-
),
125-
);
126-
// Flush any pending <script> collected before shell finished
127-
if (pendingScripts.length > 0) {
128-
for (const s of pendingScripts) {
129-
controller.enqueue(encodeForWebStream(s));
130-
}
131-
pendingScripts.length = 0;
129+
try {
130+
const { done, value } = await reader.read();
131+
if (done) {
132+
closeController();
133+
return;
134+
}
135+
136+
if (isClosed) return;
137+
138+
if (shellChunkStatus !== ShellChunkStatus.FINISH) {
139+
chunkVec.push(new TextDecoder().decode(value));
140+
const concatedChunk = chunkVec.join('');
141+
142+
if (concatedChunk.includes(ESCAPED_SHELL_STREAM_END_MARK)) {
143+
shellChunkStatus = ShellChunkStatus.FINISH;
144+
safeEnqueue(
145+
encodeForWebStream(
146+
`${shellBefore}${concatedChunk.replace(
147+
ESCAPED_SHELL_STREAM_END_MARK,
148+
'',
149+
)}${shellAfter}`,
150+
),
151+
);
152+
flushPendingScripts();
153+
}
154+
} else {
155+
safeEnqueue(value);
156+
}
157+
158+
if (!isClosed) push();
159+
} catch (error) {
160+
if (!isClosed) {
161+
isClosed = true;
162+
try {
163+
controller.error(error);
164+
} catch {
165+
// Controller already closed
132166
}
133167
}
134-
} else {
135-
controller.enqueue(value);
136168
}
137-
push();
138169
}
139170
push();
140171
},
172+
cancel(reason) {
173+
reader.cancel(reason).catch(() => {
174+
// Ignore cancellation errors
175+
});
176+
},
141177
});
142178
return stream;
143179
} catch (e) {

0 commit comments

Comments
 (0)