Skip to content
This repository was archived by the owner on Mar 18, 2025. It is now read-only.

Commit b15ad06

Browse files
Stream (#23)
* Fix: events should only be created once * chore: add changeset --------- Co-authored-by: jiang-zhexin <162887873+jiang-zhexin@users.noreply.github.com>
1 parent 24e68bd commit b15ad06

File tree

2 files changed

+36
-40
lines changed

2 files changed

+36
-40
lines changed

.changeset/twenty-moles-tap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"workers-ai-provider": patch
3+
---
4+
5+
Fix streaming output by ensuring that events is only called once per stream

packages/ai-provider/src/workersai-chat-language-model.ts

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ export class WorkersAIChatLanguageModel implements LanguageModelV1 {
160160
finishReason: "stop", // TODO: mapWorkersAIFinishReason(response.finish_reason),
161161
rawCall: { rawPrompt: args.messages, rawSettings: args },
162162
usage: {
163-
// TODO: mapWorkersAIUsage(response.usage),
164-
promptTokens: 0,
165-
completionTokens: 0,
163+
// TODO: mapWorkersAIUsage(response.usage),
164+
promptTokens: 0,
165+
completionTokens: 0,
166166
},
167167
warnings,
168168
};
@@ -173,8 +173,6 @@ export class WorkersAIChatLanguageModel implements LanguageModelV1 {
173173
): Promise<Awaited<ReturnType<LanguageModelV1["doStream"]>>> {
174174
const { args, warnings } = this.getArgs(options);
175175

176-
const decoder = new TextDecoder();
177-
178176
const response = await this.config.binding.run(args.model, {
179177
messages: args.messages,
180178
stream: true,
@@ -185,45 +183,38 @@ export class WorkersAIChatLanguageModel implements LanguageModelV1 {
185183
throw new Error("This shouldn't happen");
186184
}
187185

188-
return {
189-
stream: response.pipeThrough(
190-
new TransformStream<Uint8Array, LanguageModelV1StreamPart>({
191-
async transform(chunk, controller) {
192-
const chunkToText = decoder.decode(chunk);
193-
const chunks = events(new Response(chunkToText));
194-
for await (const singleChunk of chunks) {
195-
if (!singleChunk.data) {
196-
continue;
197-
}
198-
if (singleChunk.data === "[DONE]") {
199-
controller.enqueue({
200-
type: "finish",
201-
finishReason: "stop",
202-
usage: {
203-
promptTokens: 0,
204-
completionTokens: 0,
205-
},
206-
});
207-
return;
208-
}
209-
const data = JSON.parse(singleChunk.data);
186+
const chunkEvent = events(new Response(response));
187+
const usage = { promptTokens: 0, completionTokens: 0 };
210188

189+
return {
190+
stream: new ReadableStream<LanguageModelV1StreamPart>({
191+
async start(controller) {
192+
for await (const event of chunkEvent) {
193+
if (!event.data) {
194+
continue;
195+
}
196+
if (event.data === "[DONE]") {
197+
break;
198+
}
199+
const chunk = JSON.parse(event.data);
200+
if (chunk.usage) {
201+
usage.promptTokens = chunk.usage.prompt_tokens ?? 0;
202+
usage.completionTokens = chunk.usage.completion_tokens ?? 0;
203+
}
204+
chunk.response.length &&
211205
controller.enqueue({
212206
type: "text-delta",
213-
textDelta: data.response ?? "DATALOSS",
207+
textDelta: chunk.response,
214208
});
215-
}
216-
controller.enqueue({
217-
type: "finish",
218-
finishReason: "stop",
219-
usage: {
220-
promptTokens: 0,
221-
completionTokens: 0,
222-
},
223-
});
224-
},
225-
})
226-
),
209+
}
210+
controller.enqueue({
211+
type: "finish",
212+
finishReason: "stop",
213+
usage: usage,
214+
});
215+
controller.close();
216+
},
217+
}),
227218
rawCall: { rawPrompt: args.messages, rawSettings: args },
228219
warnings,
229220
};

0 commit comments

Comments
 (0)