Skip to content

Commit ed4b7bd

Browse files
authored
Add files via upload
1 parent 3ce189b commit ed4b7bd

File tree

1 file changed

+18
-14
lines changed

1 file changed

+18
-14
lines changed

src/streaming.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ export class Stream<Item> implements AsyncIterable<Item> {
7979
if (e instanceof Error && e.name === 'AbortError') return;
8080
throw e;
8181
} finally {
82-
// If the user `break`s, abort the ongoing request.
83-
if (!done) controller.abort();
82+
if (!done && !controller.signal.aborted) {
83+
controller.abort(); // Cleanup only if still active
84+
}
8485
}
8586
}
8687

@@ -126,8 +127,9 @@ export class Stream<Item> implements AsyncIterable<Item> {
126127
if (e instanceof Error && e.name === 'AbortError') return;
127128
throw e;
128129
} finally {
129-
// If the user `break`s, abort the ongoing request.
130-
if (!done) controller.abort();
130+
if (!done && response.body) {
131+
response.body.cancel();
132+
}
131133
}
132134
}
133135

@@ -208,20 +210,22 @@ export async function* _iterSSEMessages(
208210
throw new OpenAIError(`Attempted to iterate over a response with no body`);
209211
}
210212

213+
const reader = response.body.getReader(); // Explicit reader
211214
const sseDecoder = new SSEDecoder();
212215
const lineDecoder = new LineDecoder();
213216

214-
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
215-
for await (const sseChunk of iterSSEChunks(iter)) {
216-
for (const line of lineDecoder.decode(sseChunk)) {
217-
const sse = sseDecoder.decode(line);
218-
if (sse) yield sse;
217+
try {
218+
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
219+
for await (const sseChunk of iterSSEChunks(iter)) {
220+
for (const line of lineDecoder.decode(sseChunk)) {
221+
const sse = sseDecoder.decode(line);
222+
if (sse) yield sse;
223+
}
219224
}
220-
}
221-
222-
for (const line of lineDecoder.flush()) {
223-
const sse = sseDecoder.decode(line);
224-
if (sse) yield sse;
225+
} finally {
226+
// Ensure cleanup when stream is done
227+
reader.cancel(); // Explicitly cancel reader
228+
controller.abort(); // Abort request to close socket
225229
}
226230
}
227231

0 commit comments

Comments
 (0)