Skip to content

Commit bbb24ac

Browse files
committed
feat(stream-utils): splitStream method was added
1 parent 0269479 commit bbb24ac

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

packages/stream-utils/src/index.spec.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { spawn } from 'child_process'
2+
import { Readable } from 'stream'
23
import {
34
describe,
45
it,
56
expect
67
} from 'vitest'
78
import {
89
toArray,
9-
mergeReadables
10+
mergeReadables,
11+
splitStream
1012
} from './index.js'
1113

1214
function stream(id: string, time: number) {
@@ -30,4 +32,24 @@ describe('stream-utils', () => {
3032
expect(result).toHaveLength(20)
3133
})
3234
})
35+
36+
describe('splitStream', () => {
37+
it('should split strings stream by separator', async () => {
38+
const stream = Readable.from([
39+
'1 2',
40+
' 3',
41+
' 4 5 6'
42+
])
43+
const result = await toArray(splitStream(stream, ' '))
44+
45+
expect(result).toEqual([
46+
'1',
47+
'2',
48+
'3',
49+
'4',
50+
'5',
51+
'6'
52+
])
53+
})
54+
})
3355
})

packages/stream-utils/src/index.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,30 @@ export function mergeReadables<
8989

9090
return mergedStream
9191
}
92+
93+
/**
94+
* Split stream by separator.
95+
* @param stream
96+
* @param separator
97+
* @yields String chunks.
98+
*/
99+
export async function* splitStream(stream: AsyncIterable<string | Buffer>, separator: string) {
100+
let chunk: string | Buffer
101+
let payload: string[]
102+
let buffer = ''
103+
104+
for await (chunk of stream) {
105+
buffer += chunk.toString()
106+
107+
if (buffer.includes(separator)) {
108+
payload = buffer.split(separator)
109+
buffer = payload.pop() || ''
110+
111+
yield* payload
112+
}
113+
}
114+
115+
if (buffer) {
116+
yield buffer
117+
}
118+
}

0 commit comments

Comments
 (0)