Skip to content

Commit d61614f

Browse files
committed
stamp: moving streaming utils to llm folder
- Extracting json parsers to a separated file - Moving LLM stream related code to a separated folder
1 parent 1838447 commit d61614f

File tree

5 files changed

+212
-84
lines changed

5 files changed

+212
-84
lines changed

ext/ai/js/ai.js

Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,8 @@
1-
import "ext:ai/onnxruntime/onnx.js";
2-
import EventSourceStream from "ext:ai/util/event_source_stream.mjs";
1+
import 'ext:ai/onnxruntime/onnx.js';
2+
import { parseJSON, parseJSONOverEventStream } from './llm/utils/json_parser.ts';
33

44
const core = globalThis.Deno.core;
55

6-
/**
7-
* @param {ReadableStream<Uint8Array} itr
8-
* @param {AbortSignal} signal
9-
*/
10-
const parseJSON = async function* (itr, signal) {
11-
let buffer = "";
12-
13-
const decoder = new TextDecoder("utf-8");
14-
const reader = itr.getReader();
15-
16-
while (true) {
17-
try {
18-
if (signal.aborted) {
19-
reader.cancel(signal.reason);
20-
reader.releaseLock();
21-
return { error: signal.reason };
22-
}
23-
24-
const { done, value } = await reader.read();
25-
26-
if (done) {
27-
break;
28-
}
29-
30-
buffer += decoder.decode(value);
31-
32-
const parts = buffer.split("\n");
33-
34-
buffer = parts.pop() ?? "";
35-
36-
for (const part of parts) {
37-
yield JSON.parse(part);
38-
}
39-
} catch (error) {
40-
yield { error };
41-
}
42-
}
43-
44-
for (const part of buffer.split("\n").filter((p) => p !== "")) {
45-
try {
46-
yield JSON.parse(part);
47-
} catch (error) {
48-
yield { error };
49-
}
50-
}
51-
};
52-
53-
/**
54-
* @param {ReadableStream<Uint8Array} itr
55-
* @param {AbortSignal} signal
56-
*/
57-
const parseJSONOverEventStream = async function* (itr, signal) {
58-
const decoder = new EventSourceStream();
59-
60-
itr.pipeThrough(decoder);
61-
62-
/** @type {ReadableStreamDefaultReader<MessageEvent>} */
63-
const reader = decoder.readable.getReader();
64-
65-
while (true) {
66-
try {
67-
if (signal.aborted) {
68-
reader.cancel(signal.reason);
69-
reader.releaseLock();
70-
return { error: signal.reason };
71-
}
72-
73-
const { done, value } = await reader.read();
74-
75-
if (done) {
76-
break;
77-
}
78-
79-
yield JSON.parse(value.data);
80-
} catch (error) {
81-
yield { error };
82-
}
83-
}
84-
};
85-
866
class Session {
877
model;
888
init;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import EventStreamParser from './event_stream_parser.mjs';
2+
/**
3+
* A Web stream which handles Server-Sent Events from a binary ReadableStream like you get from the fetch API.
4+
* Implements the TransformStream interface, and can be used with the Streams API as such.
5+
*/
6+
class EventSourceStream {
7+
constructor() {
8+
// Two important things to note here:
9+
// 1. The SSE spec allows for an optional UTF-8 BOM.
10+
// 2. We have to use a *streaming* decoder, in case two adjacent data chunks are split up in the middle of a
11+
// multibyte Unicode character. Trying to parse the two separately would result in data corruption.
12+
const decoder = new TextDecoderStream('utf-8');
13+
let parser;
14+
const sseStream = new TransformStream({
15+
start(controller) {
16+
parser = new EventStreamParser((data, eventType, lastEventId) => {
17+
controller.enqueue(
18+
new MessageEvent(eventType, { data, lastEventId }),
19+
);
20+
});
21+
},
22+
transform(chunk) {
23+
parser.push(chunk);
24+
},
25+
});
26+
27+
decoder.readable.pipeThrough(sseStream);
28+
29+
this.readable = sseStream.readable;
30+
this.writable = decoder.writable;
31+
}
32+
}
33+
export default EventSourceStream;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// https://github.com/valadaptive/server-sent-stream
2+
3+
/**
4+
* A parser for the server-sent events stream format.
5+
*
6+
* Note that this parser does not handle text decoding! To do it correctly, use a streaming text decoder, since the
7+
* stream could be split up mid-Unicode character, and decoding each chunk at once could lead to incorrect results.
8+
*
9+
* This parser is used by streaming chunks in using the {@link push} method, and then calling the {@link end} method
10+
* when the stream has ended.
11+
*/
12+
class EventStreamParser {
13+
/**
14+
* Construct a new parser for a single stream.
15+
* @param onEvent A callback which will be called for each new event parsed. The parameters in order are the
16+
* event data, the event type, and the last seen event ID. This may be called none, once, or many times per push()
17+
* call, and may be called from the end() call.
18+
*/
19+
constructor(onEvent) {
20+
this.streamBuffer = "";
21+
this.lastEventId = "";
22+
this.onEvent = onEvent;
23+
}
24+
/**
25+
* Process a single incoming chunk of the event stream.
26+
*/
27+
_processChunk() {
28+
// Events are separated by two newlines
29+
const events = this.streamBuffer.split(/\r\n\r\n|\r\r|\n\n/g);
30+
if (events.length === 0) {
31+
return;
32+
}
33+
// The leftover text to remain in the buffer is whatever doesn't have two newlines after it. If the buffer ended
34+
// with two newlines, this will be an empty string.
35+
this.streamBuffer = events.pop();
36+
for (const eventChunk of events) {
37+
let eventType = "";
38+
// Split up by single newlines.
39+
const lines = eventChunk.split(/\n|\r|\r\n/g);
40+
let eventData = "";
41+
for (const line of lines) {
42+
const lineMatch = /([^:]+)(?:: ?(.*))?/.exec(line);
43+
if (lineMatch) {
44+
const field = lineMatch[1];
45+
const value = lineMatch[2] || "";
46+
switch (field) {
47+
case "event":
48+
eventType = value;
49+
break;
50+
case "data":
51+
eventData += value;
52+
eventData += "\n";
53+
break;
54+
case "id":
55+
// The ID field cannot contain null, per the spec
56+
if (!value.includes("\0")) {
57+
this.lastEventId = value;
58+
}
59+
break;
60+
// We do nothing for the `delay` type, and other types are explicitly ignored
61+
}
62+
}
63+
}
64+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
65+
// Skip the event if the data buffer is the empty string.
66+
if (eventData === "") {
67+
continue;
68+
}
69+
if (eventData[eventData.length - 1] === "\n") {
70+
eventData = eventData.slice(0, -1);
71+
}
72+
// Trim the *last* trailing newline only.
73+
this.onEvent(eventData, eventType || "message", this.lastEventId);
74+
}
75+
}
76+
/**
77+
* Push a new chunk of data to the parser. This may cause the {@link onEvent} callback to be called, possibly
78+
* multiple times depending on the number of events contained within the chunk.
79+
* @param chunk The incoming chunk of data.
80+
*/
81+
push(chunk) {
82+
this.streamBuffer += chunk;
83+
this._processChunk();
84+
}
85+
/**
86+
* Indicate that the stream has ended.
87+
*/
88+
end() {
89+
// This is a no-op
90+
}
91+
}
92+
export default EventStreamParser;

ext/ai/js/llm/utils/json_parser.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import EventSourceStream from './event_source_stream.mjs';
2+
3+
// Adapted from https://github.com/ollama/ollama-js/blob/6a4bfe3ab033f611639dfe4249bdd6b9b19c7256/src/utils.ts#L262
4+
// TODO:(kallebysantos) need to simplify it
5+
export async function* parseJSON<T extends object>(
6+
itr: ReadableStream<Uint8Array>,
7+
signal: AbortSignal,
8+
) {
9+
let buffer = '';
10+
11+
const decoder = new TextDecoder('utf-8');
12+
const reader = itr.getReader();
13+
14+
while (true) {
15+
try {
16+
if (signal.aborted) {
17+
reader.cancel(signal.reason);
18+
reader.releaseLock();
19+
return { error: signal.reason };
20+
}
21+
22+
const { done, value } = await reader.read();
23+
24+
if (done) {
25+
break;
26+
}
27+
28+
buffer += decoder.decode(value);
29+
30+
const parts = buffer.split('\n');
31+
32+
buffer = parts.pop() ?? '';
33+
34+
for (const part of parts) {
35+
yield JSON.parse(part) as T;
36+
}
37+
} catch (error) {
38+
yield { error };
39+
}
40+
}
41+
42+
for (const part of buffer.split('\n').filter((p) => p !== '')) {
43+
try {
44+
yield JSON.parse(part) as T;
45+
} catch (error) {
46+
yield { error };
47+
}
48+
}
49+
}
50+
51+
// TODO:(kallebysantos) need to simplify it
52+
export async function* parseJSONOverEventStream(
53+
itr: ReadableStream<Uint8Array>,
54+
signal: AbortSignal,
55+
) {
56+
const decoder = new EventSourceStream();
57+
58+
itr.pipeThrough(decoder);
59+
60+
const reader: ReadableStreamDefaultReader<MessageEvent> = decoder.readable
61+
.getReader();
62+
63+
while (true) {
64+
try {
65+
if (signal.aborted) {
66+
reader.cancel(signal.reason);
67+
reader.releaseLock();
68+
return { error: signal.reason };
69+
}
70+
71+
const { done, value } = await reader.read();
72+
73+
if (done) {
74+
break;
75+
}
76+
77+
yield JSON.parse(value.data);
78+
} catch (error) {
79+
yield { error };
80+
}
81+
}
82+
}

ext/ai/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ deno_core::extension!(
5252
esm = [
5353
dir "js",
5454
"ai.js",
55-
"util/event_stream_parser.mjs",
56-
"util/event_source_stream.mjs",
5755
"onnxruntime/onnx.js",
5856
"onnxruntime/cache_adapter.js",
5957
"llm/llm_session.ts",
58+
"llm/utils/json_parser.ts",
59+
"llm/utils/event_stream_parser.mjs",
60+
"llm/utils/event_source_stream.mjs",
6061
]
6162
);
6263

0 commit comments

Comments
 (0)