Skip to content

Commit 76959f8

Browse files
stainless-botRobertCraigie
authored andcommitted
chore(internal): minor restructuring
chore: unknown commit message
1 parent 1c6f2ba commit 76959f8

File tree

3 files changed

+37
-36
lines changed

3 files changed

+37
-36
lines changed

src/internal/decoders/line.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { OpenAIError } from '../../error';
22

3-
type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;
3+
export type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;
44

55
/**
66
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally

src/internal/shims.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,36 @@ export function ReadableStreamFrom<T>(iterable: Iterable<T> | AsyncIterable<T>):
110110
},
111111
});
112112
}
113+
114+
/**
115+
* Most browsers don't yet have async iterable support for ReadableStream,
116+
* and Node has a very different way of reading bytes from its "ReadableStream".
117+
*
118+
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
119+
*/
120+
export function ReadableStreamToAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
121+
if (stream[Symbol.asyncIterator]) return stream;
122+
123+
const reader = stream.getReader();
124+
return {
125+
async next() {
126+
try {
127+
const result = await reader.read();
128+
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
129+
return result;
130+
} catch (e) {
131+
reader.releaseLock(); // release lock when stream becomes errored
132+
throw e;
133+
}
134+
},
135+
async return() {
136+
const cancelPromise = reader.cancel();
137+
reader.releaseLock();
138+
await cancelPromise;
139+
return { done: true, value: undefined };
140+
},
141+
[Symbol.asyncIterator]() {
142+
return this;
143+
},
144+
};
145+
}

src/streaming.ts

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { OpenAIError } from './error';
22
import { type ReadableStream } from './internal/shim-types';
33
import { makeReadableStream } from './internal/shims';
44
import { LineDecoder } from './internal/decoders/line';
5+
import { ReadableStreamToAsyncIterable } from './internal/shims';
56

67
import { APIError } from './error';
78

@@ -97,7 +98,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
9798
async function* iterLines(): AsyncGenerator<string, void, unknown> {
9899
const lineDecoder = new LineDecoder();
99100

100-
const iter = readableStreamAsyncIterable<Bytes>(readableStream);
101+
const iter = ReadableStreamToAsyncIterable<Bytes>(readableStream);
101102
for await (const chunk of iter) {
102103
for (const line of lineDecoder.decode(chunk)) {
103104
yield line;
@@ -211,7 +212,7 @@ export async function* _iterSSEMessages(
211212
const sseDecoder = new SSEDecoder();
212213
const lineDecoder = new LineDecoder();
213214

214-
const iter = readableStreamAsyncIterable<Bytes>(response.body);
215+
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
215216
for await (const sseChunk of iterSSEChunks(iter)) {
216217
for (const line of lineDecoder.decode(sseChunk)) {
217218
const sse = sseDecoder.decode(line);
@@ -364,36 +365,3 @@ function partition(str: string, delimiter: string): [string, string, string] {
364365

365366
return [str, '', ''];
366367
}
367-
368-
/**
369-
* Most browsers don't yet have async iterable support for ReadableStream,
370-
* and Node has a very different way of reading bytes from its "ReadableStream".
371-
*
372-
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
373-
*/
374-
export function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
375-
if (stream[Symbol.asyncIterator]) return stream;
376-
377-
const reader = stream.getReader();
378-
return {
379-
async next() {
380-
try {
381-
const result = await reader.read();
382-
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
383-
return result;
384-
} catch (e) {
385-
reader.releaseLock(); // release lock when stream becomes errored
386-
throw e;
387-
}
388-
},
389-
async return() {
390-
const cancelPromise = reader.cancel();
391-
reader.releaseLock();
392-
await cancelPromise;
393-
return { done: true, value: undefined };
394-
},
395-
[Symbol.asyncIterator]() {
396-
return this;
397-
},
398-
};
399-
}

0 commit comments

Comments
 (0)