Skip to content

Commit dad5cf8

Browse files
feat(common): add parseSSEStream function
This function parses the raw buffer recieved from sse requests and emits completed chunks at O(n)
1 parent 00f065a commit dad5cf8

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed

lib/common.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os = require('os');
2+
import { EventEmitter } from 'events';
23

34
// tslint:disable-next-line:no-var-requires
45
const pkg = require('../package.json');
@@ -30,3 +31,95 @@ export function getSdkHeaders(serviceName: string, serviceVersion: string, opera
3031

3132
return headers;
3233
}
34+
35+
export function parseSSEStream(stream: NodeJS.ReadableStream) {
36+
let eventMemoryKey = ''
37+
let dataMemoryKey = ''
38+
let stringBuffer = ''
39+
let eventMessage = '' // Can be used in the future to return the event message to the user
40+
const eventEmitter = new EventEmitter()
41+
42+
stream.on('data', (chunk: Buffer) => {
43+
const chunkString = chunk.toString()
44+
45+
for (const char of chunkString) {
46+
// Looks for "event: "
47+
if (char === 'e' && eventMemoryKey === '') {
48+
eventMemoryKey += char
49+
} else if (char === 'v' && eventMemoryKey === 'e') {
50+
eventMemoryKey += char
51+
} else if (char === 'e' && eventMemoryKey === 'ev') {
52+
eventMemoryKey += char
53+
} else if (char === 'n' && eventMemoryKey === 'eve') {
54+
eventMemoryKey += char
55+
} else if (char === 't' && eventMemoryKey === 'even') {
56+
eventMemoryKey += char
57+
} else if (char === ':' && eventMemoryKey === 'event') {
58+
eventMemoryKey += char
59+
} else if (char === ' ' && eventMemoryKey === 'event:') {
60+
eventMemoryKey += char
61+
} else if (eventMemoryKey === 'event: ' && dataMemoryKey !== 'data: ') {
62+
stringBuffer += char // Start saving characters in string buffer
63+
} else if (eventMemoryKey === 'event: ' && dataMemoryKey === 'data: ') {
64+
// Must remove extra 'event: ' added to stringBuffer
65+
const trimmedStringBuffer = stringBuffer.substring(0, stringBuffer.length - 'event: '.length)
66+
67+
try {
68+
const parsedBuffer = JSON.parse(trimmedStringBuffer)
69+
eventEmitter.emit('data', parsedBuffer)
70+
} catch(e) {
71+
if (!(e instanceof SyntaxError)) {
72+
throw Error('Unexpected error when trying to emit data event')
73+
}
74+
// stringBuffer was not parseable, we must continue and wait for more chunks
75+
}
76+
77+
stringBuffer = char
78+
dataMemoryKey = ''
79+
} else {
80+
eventMemoryKey = ''
81+
}
82+
// Looks for "data: "
83+
if (char === 'd' && eventMemoryKey === 'event: ') {
84+
dataMemoryKey += char
85+
} else if (char === 'a' && dataMemoryKey === 'd') {
86+
dataMemoryKey += char
87+
} else if (char === 't' && dataMemoryKey === 'da') {
88+
dataMemoryKey += char
89+
} else if (char === 'a' && dataMemoryKey === 'dat') {
90+
dataMemoryKey += char
91+
} else if (char === ':' && dataMemoryKey === 'data') {
92+
dataMemoryKey += char
93+
} else if (char === ' ' && dataMemoryKey === 'data:') {
94+
dataMemoryKey += char
95+
96+
// Must remove extra 'data: ' and '/n' added to stringBuffer
97+
stringBuffer = stringBuffer.substring(0, stringBuffer.length - 'data: '.length - 1)
98+
99+
// Save and clear stringBuffer
100+
eventMessage = stringBuffer
101+
stringBuffer = ''
102+
eventMemoryKey = ''
103+
} else if (dataMemoryKey === 'data: ') {
104+
stringBuffer += char // Start saving characters in string buffer
105+
}
106+
else {
107+
dataMemoryKey = ''
108+
}
109+
}
110+
111+
try {
112+
const parsedBuffer = JSON.parse(stringBuffer)
113+
eventEmitter.emit('data', parsedBuffer)
114+
115+
stringBuffer = ''
116+
} catch(e) {
117+
if (!(e instanceof SyntaxError)) {
118+
throw Error('Unexpected error when trying to emit data event')
119+
}
120+
// stringBuffer was not parseable, we must continue and wait for more chunks
121+
}
122+
});
123+
124+
return eventEmitter
125+
}

0 commit comments

Comments
 (0)