Skip to content

Commit 260b0c0

Browse files
authored
Add files via upload
1 parent ed4b7bd commit 260b0c0

File tree

1 file changed

+14
-18
lines changed

1 file changed

+14
-18
lines changed

src/streaming.ts

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
7979
if (e instanceof Error && e.name === 'AbortError') return;
8080
throw e;
8181
} finally {
82-
if (!done && !controller.signal.aborted) {
83-
controller.abort(); // Cleanup only if still active
84-
}
82+
// If the user `break`s, abort the ongoing request.
83+
if (!done) controller.abort();
8584
}
8685
}
8786

@@ -127,9 +126,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
127126
if (e instanceof Error && e.name === 'AbortError') return;
128127
throw e;
129128
} finally {
130-
if (!done && response.body) {
131-
response.body.cancel();
132-
}
129+
// If the user `break`s, abort the ongoing request.
130+
if (!done) controller.abort();
133131
}
134132
}
135133

@@ -210,22 +208,20 @@ export async function* _iterSSEMessages(
210208
throw new OpenAIError(`Attempted to iterate over a response with no body`);
211209
}
212210

213-
const reader = response.body.getReader(); // Explicit reader
214211
const sseDecoder = new SSEDecoder();
215212
const lineDecoder = new LineDecoder();
216213

217-
try {
218-
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
219-
for await (const sseChunk of iterSSEChunks(iter)) {
220-
for (const line of lineDecoder.decode(sseChunk)) {
221-
const sse = sseDecoder.decode(line);
222-
if (sse) yield sse;
223-
}
214+
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
215+
for await (const sseChunk of iterSSEChunks(iter)) {
216+
for (const line of lineDecoder.decode(sseChunk)) {
217+
const sse = sseDecoder.decode(line);
218+
if (sse) yield sse;
224219
}
225-
} finally {
226-
// Ensure cleanup when stream is done
227-
reader.cancel(); // Explicitly cancel reader
228-
controller.abort(); // Abort request to close socket
220+
}
221+
222+
for (const line of lineDecoder.flush()) {
223+
const sse = sseDecoder.decode(line);
224+
if (sse) yield sse;
229225
}
230226
}
231227

0 commit comments

Comments
 (0)