Skip to content

Commit 6cafcc7

Browse files
stainless-botRobertCraigie
authored andcommitted
fix: optimize sse chunk reading off-by-one error
1 parent 4bb7745 commit 6cafcc7

File tree

4 files changed

+161
-127
lines changed

4 files changed

+161
-127
lines changed

src/internal/decoders/line.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,34 @@ function findNewlineIndex(
147147

148148
return null;
149149
}
150+
151+
export function findDoubleNewlineIndex(buffer: Uint8Array): number {
152+
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
153+
// and returns the index right after the first occurrence of any pattern,
154+
// or -1 if none of the patterns are found.
155+
const newline = 0x0a; // \n
156+
const carriage = 0x0d; // \r
157+
158+
for (let i = 0; i < buffer.length - 1; i++) {
159+
if (buffer[i] === newline && buffer[i + 1] === newline) {
160+
// \n\n
161+
return i + 2;
162+
}
163+
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
164+
// \r\r
165+
return i + 2;
166+
}
167+
if (
168+
buffer[i] === carriage &&
169+
buffer[i + 1] === newline &&
170+
i + 3 < buffer.length &&
171+
buffer[i + 2] === carriage &&
172+
buffer[i + 3] === newline
173+
) {
174+
// \r\n\r\n
175+
return i + 4;
176+
}
177+
}
178+
179+
return -1;
180+
}

src/streaming.ts

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { OpenAIError } from './error';
22
import { type ReadableStream } from './internal/shim-types';
33
import { makeReadableStream } from './internal/shims';
4-
import { LineDecoder } from './internal/decoders/line';
4+
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
55
import { ReadableStreamToAsyncIterable } from './internal/shims';
66
import { isAbortError } from './internal/errors';
77

@@ -271,37 +271,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
271271
}
272272
}
273273

274-
function findDoubleNewlineIndex(buffer: Uint8Array): number {
275-
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
276-
// and returns the index right after the first occurrence of any pattern,
277-
// or -1 if none of the patterns are found.
278-
const newline = 0x0a; // \n
279-
const carriage = 0x0d; // \r
280-
281-
for (let i = 0; i < buffer.length - 2; i++) {
282-
if (buffer[i] === newline && buffer[i + 1] === newline) {
283-
// \n\n
284-
return i + 2;
285-
}
286-
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
287-
// \r\r
288-
return i + 2;
289-
}
290-
if (
291-
buffer[i] === carriage &&
292-
buffer[i + 1] === newline &&
293-
i + 3 < buffer.length &&
294-
buffer[i + 2] === carriage &&
295-
buffer[i + 3] === newline
296-
) {
297-
// \r\n\r\n
298-
return i + 4;
299-
}
300-
}
301-
302-
return -1;
303-
}
304-
305274
class SSEDecoder {
306275
private data: string[];
307276
private event: string | null;
@@ -357,21 +326,6 @@ class SSEDecoder {
357326
}
358327
}
359328

360-
/** This is an internal helper function that's just used for testing */
361-
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
362-
const decoder = new LineDecoder();
363-
const lines: string[] = [];
364-
for (const chunk of chunks) {
365-
lines.push(...decoder.decode(chunk));
366-
}
367-
368-
if (flush) {
369-
lines.push(...decoder.flush());
370-
}
371-
372-
return lines;
373-
}
374-
375329
function partition(str: string, delimiter: string): [string, string, string] {
376330
const index = str.indexOf(delimiter);
377331
if (index !== -1) {
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { findDoubleNewlineIndex, LineDecoder } from 'openai/internal/decoders/line';
2+
3+
function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
4+
const decoder = new LineDecoder();
5+
const lines: string[] = [];
6+
for (const chunk of chunks) {
7+
lines.push(...decoder.decode(chunk));
8+
}
9+
10+
if (flush) {
11+
lines.push(...decoder.flush());
12+
}
13+
14+
return lines;
15+
}
16+
17+
describe('line decoder', () => {
18+
test('basic', () => {
19+
// baz is not included because the line hasn't ended yet
20+
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
21+
});
22+
23+
test('basic with \\r', () => {
24+
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
25+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
26+
});
27+
28+
test('trailing new lines', () => {
29+
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
30+
});
31+
32+
test('trailing new lines with \\r', () => {
33+
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
34+
});
35+
36+
test('escaped new lines', () => {
37+
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
38+
});
39+
40+
test('escaped new lines with \\r', () => {
41+
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
42+
});
43+
44+
test('\\r & \\n split across multiple chunks', () => {
45+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
46+
});
47+
48+
test('single \\r', () => {
49+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
50+
});
51+
52+
test('double \\r', () => {
53+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
54+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
55+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
56+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
57+
});
58+
59+
test('double \\r then \\r\\n', () => {
60+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
61+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
62+
});
63+
64+
test('double newline', () => {
65+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
66+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
67+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
68+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
69+
});
70+
71+
test('multi-byte characters across chunks', () => {
72+
const decoder = new LineDecoder();
73+
74+
// bytes taken from the string 'известни' and arbitrarily split
75+
// so that some multi-byte characters span multiple chunks
76+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
77+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
78+
expect(
79+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
80+
).toHaveLength(0);
81+
82+
const decoded = decoder.decode(new Uint8Array([0xa]));
83+
expect(decoded).toEqual(['известни']);
84+
});
85+
86+
test('flushing trailing newlines', () => {
87+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
88+
});
89+
90+
test('flushing empty buffer', () => {
91+
expect(decodeChunks([], { flush: true })).toEqual([]);
92+
});
93+
});
94+
95+
describe('findDoubleNewlineIndex', () => {
96+
test('finds \\n\\n', () => {
97+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
98+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
99+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
100+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
101+
});
102+
103+
test('finds \\r\\r', () => {
104+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
105+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
106+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
107+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
108+
});
109+
110+
test('finds \\r\\n\\r\\n', () => {
111+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
112+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
113+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
114+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
115+
});
116+
117+
test('returns -1 when no double newline found', () => {
118+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
119+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
120+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
121+
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
122+
});
123+
124+
test('handles incomplete patterns', () => {
125+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
126+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
127+
});
128+
});

tests/streaming.test.ts

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,6 @@
11
import { PassThrough } from 'stream';
22
import assert from 'assert';
3-
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
4-
import { LineDecoder } from 'openai/internal/decoders/line';
5-
6-
describe('line decoder', () => {
7-
test('basic', () => {
8-
// baz is not included because the line hasn't ended yet
9-
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
10-
});
11-
12-
test('basic with \\r', () => {
13-
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
14-
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
15-
});
16-
17-
test('trailing new lines', () => {
18-
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
19-
});
20-
21-
test('trailing new lines with \\r', () => {
22-
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
23-
});
24-
25-
test('escaped new lines', () => {
26-
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
27-
});
28-
29-
test('escaped new lines with \\r', () => {
30-
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
31-
});
32-
33-
test('\\r & \\n split across multiple chunks', () => {
34-
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
35-
});
36-
37-
test('single \\r', () => {
38-
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
39-
});
40-
41-
test('double \\r', () => {
42-
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
43-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
44-
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
45-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
46-
});
47-
48-
test('double \\r then \\r\\n', () => {
49-
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
50-
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51-
});
52-
53-
test('double newline', () => {
54-
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
55-
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56-
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
57-
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58-
});
59-
60-
test('multi-byte characters across chunks', () => {
61-
const decoder = new LineDecoder();
62-
63-
// bytes taken from the string 'известни' and arbitrarily split
64-
// so that some multi-byte characters span multiple chunks
65-
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
66-
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
67-
expect(
68-
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
69-
).toHaveLength(0);
70-
71-
const decoded = decoder.decode(new Uint8Array([0xa]));
72-
expect(decoded).toEqual(['известни']);
73-
});
74-
75-
test('flushing trailing newlines', () => {
76-
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
77-
});
78-
79-
test('flushing empty buffer', () => {
80-
expect(decodeChunks([], { flush: true })).toEqual([]);
81-
});
82-
});
3+
import { _iterSSEMessages } from 'openai/streaming';
834

845
describe('streaming decoding', () => {
856
test('basic', async () => {

0 commit comments

Comments
 (0)