Skip to content

Commit 1998816

Browse files
feat(common): add parseSSEStream function
This function parses the raw buffer recieved from sse requests and emits completed chunks at O(n)
1 parent 00f065a commit 1998816

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed

lib/common.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os = require('os');
2+
import { EventEmitter } from 'events';
23

34
// tslint:disable-next-line:no-var-requires
45
const pkg = require('../package.json');
@@ -30,3 +31,101 @@ export function getSdkHeaders(serviceName: string, serviceVersion: string, opera
3031

3132
return headers;
3233
}
34+
35+
export function parseSSEStream(stream: NodeJS.ReadableStream) {
36+
let eventMemoryKey = ''
37+
let dataMemoryKey = ''
38+
let stringBuffer = ''
39+
let eventMessage = ''
40+
const eventEmitter = new EventEmitter()
41+
42+
stream.on('data', (chunk: Buffer) => {
43+
const chunkString = chunk.toString()
44+
45+
for (const char of chunkString) {
46+
// Looks for "event: "
47+
if (char === 'e' && eventMemoryKey === '') {
48+
eventMemoryKey += char
49+
} else if (char === 'v' && eventMemoryKey === 'e') {
50+
eventMemoryKey += char
51+
} else if (char === 'e' && eventMemoryKey === 'ev') {
52+
eventMemoryKey += char
53+
} else if (char === 'n' && eventMemoryKey === 'eve') {
54+
eventMemoryKey += char
55+
} else if (char === 't' && eventMemoryKey === 'even') {
56+
eventMemoryKey += char
57+
} else if (char === ':' && eventMemoryKey === 'event') {
58+
eventMemoryKey += char
59+
} else if (char === ' ' && eventMemoryKey === 'event:') {
60+
eventMemoryKey += char
61+
} else if (eventMemoryKey === 'event: ' && dataMemoryKey !== 'data: ') {
62+
stringBuffer += char // Start saving characters in string buffer
63+
} else if (eventMemoryKey === 'event: ' && dataMemoryKey === 'data: ') {
64+
// Must remove extra 'event: ' added to stringBuffer
65+
const trimmedStringBuffer = stringBuffer.substring(0, stringBuffer.length - 'event: '.length)
66+
67+
try {
68+
const parsedBuffer = JSON.parse(trimmedStringBuffer)
69+
eventEmitter.emit('data', {
70+
event: eventMessage,
71+
data: parsedBuffer
72+
})
73+
} catch(e) {
74+
if (!(e instanceof SyntaxError)) {
75+
throw Error('Unexpected error when trying to emit data event')
76+
}
77+
// stringBuffer was not parseable, we must continue and wait for more chunks
78+
}
79+
80+
stringBuffer = char
81+
dataMemoryKey = ''
82+
} else {
83+
eventMemoryKey = ''
84+
}
85+
// Looks for "data: "
86+
if (char === 'd' && eventMemoryKey === 'event: ') {
87+
dataMemoryKey += char
88+
} else if (char === 'a' && dataMemoryKey === 'd') {
89+
dataMemoryKey += char
90+
} else if (char === 't' && dataMemoryKey === 'da') {
91+
dataMemoryKey += char
92+
} else if (char === 'a' && dataMemoryKey === 'dat') {
93+
dataMemoryKey += char
94+
} else if (char === ':' && dataMemoryKey === 'data') {
95+
dataMemoryKey += char
96+
} else if (char === ' ' && dataMemoryKey === 'data:') {
97+
dataMemoryKey += char
98+
99+
// Must remove extra 'data: ' and '/n' added to stringBuffer
100+
stringBuffer = stringBuffer.substring(0, stringBuffer.length - 'data: '.length - 1)
101+
102+
// Save and clear stringBuffer
103+
eventMessage = stringBuffer
104+
stringBuffer = ''
105+
eventMemoryKey = ''
106+
} else if (dataMemoryKey === 'data: ') {
107+
stringBuffer += char // Start saving characters in string buffer
108+
}
109+
else {
110+
dataMemoryKey = ''
111+
}
112+
}
113+
114+
try {
115+
const parsedBuffer = JSON.parse(stringBuffer)
116+
eventEmitter.emit('data', {
117+
event: eventMessage,
118+
data: parsedBuffer
119+
})
120+
121+
stringBuffer = ''
122+
} catch(e) {
123+
if (!(e instanceof SyntaxError)) {
124+
throw Error('Unexpected error when trying to emit data event')
125+
}
126+
// stringBuffer was not parseable, we must continue and wait for more chunks
127+
}
128+
});
129+
130+
return eventEmitter
131+
}

0 commit comments

Comments
 (0)