Skip to content

Commit c97a9a3

Browse files
committed
fix: openai streaming
- OpenAI uses a different streaming alternative that ends with `[DONE]`
1 parent 700618d commit c97a9a3

File tree

3 files changed

+42
-17
lines changed

3 files changed

+42
-17
lines changed

ext/ai/js/llm/providers/openai.ts

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ export type OpenAIRequest = {
3131
top_p?: number;
3232
n?: number;
3333
stream?: boolean;
34+
stream_options: {
35+
include_usage: boolean;
36+
};
3437
stop?: string | string[];
3538
max_tokens?: number;
3639
presence_penalty?: number;
@@ -69,7 +72,7 @@ export type OpenAIResponseUsage = {
6972

7073
export type OpenAIResponseChoice = {
7174
index: number;
72-
message: {
75+
message?: {
7376
role: "assistant" | "user" | "system" | "tool";
7477
content: string | null;
7578
function_call?: {
@@ -85,6 +88,9 @@ export type OpenAIResponseChoice = {
8588
};
8689
}[];
8790
};
91+
delta?: {
92+
content: string | null;
93+
};
8894
finish_reason: "stop" | "length" | "tool_calls" | "content_filter" | null;
8995
};
9096

@@ -98,7 +104,10 @@ export type OpenAIResponse = {
98104
usage?: OpenAIResponseUsage;
99105
};
100106

101-
export type OpenAICompatibleInput = Omit<OpenAIRequest, "stream" | "model">;
107+
export type OpenAICompatibleInput = Omit<
108+
OpenAIRequest,
109+
"stream" | "stream_options" | "model"
110+
>;
102111

103112
export type OpenAIProviderInput = ILLMProviderInput<OpenAICompatibleInput>;
104113
export type OpenAIProviderOutput = ILLMProviderOutput<OpenAIResponse>;
@@ -126,23 +135,25 @@ export class OpenAILLMSession implements ILLMProvider, ILLMProviderMeta {
126135
const parser = this.parse;
127136
const stream = async function* () {
128137
for await (const message of generator) {
138+
// NOTE:(kallebysantos) while streaming the final message will not include 'finish_reason'
139+
// Instead a '[DONE]' value will be returned to close the stream
140+
if ("done" in message && message.done) {
141+
return;
142+
}
143+
129144
if ("error" in message) {
130145
if (message.error instanceof Error) {
131146
throw message.error;
132-
} else {
133-
throw new Error(message.error as string);
134147
}
148+
149+
throw new Error(message.error as string);
135150
}
136151

137152
yield parser(message);
138-
const finishReason = message.choices[0].finish_reason;
139153

140-
if (finishReason) {
141-
if (finishReason !== "stop") {
142-
throw new Error("Expected a completed response.");
143-
}
144-
145-
return;
154+
const finish_reason = message.choices.at(0)?.finish_reason;
155+
if (finish_reason && finish_reason !== "stop") {
156+
throw new Error("Expected a completed response.");
146157
}
147158
}
148159

@@ -172,12 +183,14 @@ export class OpenAILLMSession implements ILLMProvider, ILLMProviderMeta {
172183
return this.parse(response);
173184
}
174185

175-
private parse(message: OpenAIResponse): OpenAIProviderOutput {
176-
const { usage } = message;
186+
private parse(response: OpenAIResponse): OpenAIProviderOutput {
187+
const { usage } = response;
188+
const choice = response.choices.at(0);
177189

178190
return {
179-
value: message.choices.at(0)?.message.content ?? undefined,
180-
inner: message,
191+
// NOTE:(kallebysantos) while streaming the 'delta' field will be used instead of 'message'
192+
value: choice?.message?.content ?? choice?.delta?.content ?? undefined,
193+
inner: response,
181194
usage: {
182195
// NOTE:(kallebysantos) usage maybe 'null' while streaming, but the final message will include it
183196
inputTokens: usage?.prompt_tokens ?? 0,
@@ -204,6 +217,9 @@ export class OpenAILLMSession implements ILLMProvider, ILLMProviderMeta {
204217
...input,
205218
model: this.options.model,
206219
stream,
220+
stream_options: {
221+
include_usage: true,
222+
},
207223
} satisfies OpenAIRequest,
208224
),
209225
signal,

ext/ai/js/llm/utils/event_source_stream.mjs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import EventStreamParser from './event_stream_parser.mjs';
1+
import EventStreamParser from "./event_stream_parser.mjs";
22
/**
33
* A Web stream which handles Server-Sent Events from a binary ReadableStream like you get from the fetch API.
44
* Implements the TransformStream interface, and can be used with the Streams API as such.
@@ -9,11 +9,19 @@ class EventSourceStream {
99
// 1. The SSE spec allows for an optional UTF-8 BOM.
1010
// 2. We have to use a *streaming* decoder, in case two adjacent data chunks are split up in the middle of a
1111
// multibyte Unicode character. Trying to parse the two separately would result in data corruption.
12-
const decoder = new TextDecoderStream('utf-8');
12+
const decoder = new TextDecoderStream("utf-8");
1313
let parser;
1414
const sseStream = new TransformStream({
1515
start(controller) {
1616
parser = new EventStreamParser((data, eventType, lastEventId) => {
17+
// NOTE:(kallebysantos) Some providers like OpenAI send '[DONE]'
18+
// to indicates stream terminates, so we need to check if the SSE contains "[DONE]" and close the stream
19+
if (typeof data === "string" && data.trim() === "[DONE]") {
20+
controller.terminate?.(); // If supported
21+
controller.close?.(); // Fallback
22+
return;
23+
}
24+
1725
controller.enqueue(
1826
new MessageEvent(eventType, { data, lastEventId }),
1927
);

ext/ai/js/llm/utils/json_parser.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export async function* parseJSONOverEventStream<T extends object>(
7171
const { done, value } = await reader.read();
7272

7373
if (done) {
74+
yield { done };
7475
break;
7576
}
7677

0 commit comments

Comments
 (0)