Skip to content

Commit 00d011c

Browse files
committed
feat(stream-utils): small set of utilities for streams
1 parent 6141518 commit 00d011c

File tree

9 files changed

+311
-1
lines changed

9 files changed

+311
-1
lines changed

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,20 @@ A set of packages with simple utilities.
2020
| Name | Description | Version | Dependencies |
2121
|------|-------------|---------|--------------|
2222
| [`@simple-libs/hosted-git-info`](packages/hosted-git-info#readme) | A small library to parse hosted git info. | [![NPM version][hosted-git-info-npm]][hosted-git-info-npm-url] | [![Dependencies status][hosted-git-info-deps]][hosted-git-info-deps-url] |
23+
| [`@simple-libs/stream-utils`](packages/stream-utils#readme) | A small set of utilities for streams. | [![NPM version][stream-utils-npm]][stream-utils-npm-url] | [![Dependencies status][stream-utils-deps]][stream-utils-deps-url] |
2324

2425
<!-- hosted-git-info -->
2526

2627
[hosted-git-info-npm]: https://img.shields.io/npm/v/@simple-libs/hosted-git-info.svg
2728
[hosted-git-info-npm-url]: https://www.npmjs.com/package/@simple-libs/hosted-git-info
2829

2930
[hosted-git-info-deps]: https://img.shields.io/librariesio/release/npm/@simple-libs/hosted-git-info
30-
[hosted-git-info-deps-url]: https://libraries.io/npm/@simple-libs%2hosted-git-info/tree
31+
[hosted-git-info-deps-url]: https://libraries.io/npm/@simple-libs%2Fhosted-git-info/tree
32+
33+
<!-- stream-utils -->
34+
35+
[stream-utils-npm]: https://img.shields.io/npm/v/@simple-libs/stream-utils.svg
36+
[stream-utils-npm-url]: https://www.npmjs.com/package/@simple-libs/stream-utils
37+
38+
[stream-utils-deps]: https://img.shields.io/librariesio/release/npm/@simple-libs/stream-utils
39+
[stream-utils-deps-url]: https://libraries.io/npm/@simple-libs%2Fstream-utils/tree
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"extends": [
3+
"@trigen/eslint-config/typescript",
4+
"@trigen/eslint-config/typescript-requiring-type-checking",
5+
"@trigen/eslint-config/jest"
6+
],
7+
"parserOptions": {
8+
"tsconfigRootDir": "./packages/stream-utils",
9+
"project": ["./tsconfig.json"]
10+
},
11+
"rules": {
12+
"no-unreachable-loop": "off"
13+
}
14+
}

packages/stream-utils/README.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# @simple-libs/stream-utils
2+
3+
[![ESM-only package][package]][package-url]
4+
[![NPM version][npm]][npm-url]
5+
[![Node version][node]][node-url]
6+
[![Dependencies status][deps]][deps-url]
7+
[![Install size][size]][size-url]
8+
[![Build status][build]][build-url]
9+
[![Coverage status][coverage]][coverage-url]
10+
11+
[package]: https://img.shields.io/badge/package-ESM--only-ffe536.svg
12+
[package-url]: https://nodejs.org/api/esm.html
13+
14+
[npm]: https://img.shields.io/npm/v/@simple-libs/stream-utils.svg
15+
[npm-url]: https://www.npmjs.com/package/@simple-libs/stream-utils
16+
17+
[node]: https://img.shields.io/node/v/@simple-libs/stream-utils.svg
18+
[node-url]: https://nodejs.org
19+
20+
[deps]: https://img.shields.io/librariesio/release/npm/@simple-libs/stream-utils
21+
[deps-url]: https://libraries.io/npm/@simple-libs%2Fstream-utils/tree
22+
23+
[size]: https://packagephobia.com/badge?p=@simple-libs/stream-utils
24+
[size-url]: https://packagephobia.com/result?p=@simple-libs/stream-utils
25+
26+
[build]: https://img.shields.io/github/actions/workflow/status/TrigenSoftware/simple-libs/tests.yml?branch=main
27+
[build-url]: https://github.com/TrigenSoftware/simple-libs/actions
28+
29+
[coverage]: https://img.shields.io/codecov/c/github/TrigenSoftware/simple-libs.svg?flag=@simple-libs/stream-utils
30+
[coverage-url]: https://app.codecov.io/gh/TrigenSoftware/simple-libs/tree/main/packages%2Fstream-utils
31+
32+
A small set of utilities for streams.
33+
34+
## Install
35+
36+
```bash
37+
# pnpm
38+
pnpm add @simple-libs/stream-utils
39+
# yarn
40+
yarn add @simple-libs/stream-utils
41+
# npm
42+
npm i @simple-libs/stream-utils
43+
```
44+
45+
## Usage
46+
47+
```ts
48+
import {
49+
toArray,
50+
concatBufferStream,
51+
concatStringStream,
52+
firstFromStream,
53+
mergeReadables
54+
} from '@simple-libs/stream-utils'
55+
56+
// Convert a readable stream to an array
57+
await toArray(Readable.from(['foo', 'bar', 'baz']))
58+
// Returns ['foo', 'bar', 'baz']
59+
60+
// Concatenate a stream of buffers into a single buffer
61+
await concatBufferStream(Readable.from([Buffer.from('foo'), Buffer.from('bar')]))
62+
// Returns <Buffer 66 6f 6f 62 61 72>
63+
64+
// Concatenate a stream of strings into a single string
65+
await concatStringStream(Readable.from(['foo', 'bar']))
66+
// Returns 'foobar'
67+
68+
// Get the first value from a stream
69+
await firstFromStream(Readable.from(['foo', 'bar']))
70+
// Returns 'foo'
71+
72+
// Merges multiple Readable streams into a single Readable stream.
73+
// Each chunk will be an object containing the source stream name and the chunk data.
74+
await mergeReadables({
75+
foo: Readable.from(['foo1', 'foo2']),
76+
bar: Readable.from(['bar1', 'bar2'])
77+
})
78+
// Returns [{ source: 'foo', chunk: 'foo1' }, { source: 'foo', chunk: 'foo2' }, { source: 'bar', chunk: 'bar1' }, { source: 'bar', chunk: 'bar2' }]
79+
```

packages/stream-utils/package.json

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
{
2+
"name": "@simple-libs/stream-utils",
3+
"type": "module",
4+
"version": "1.0.0",
5+
"description": "A small set of utilities for streams.",
6+
"author": {
7+
"name": "Dan Onoshko",
8+
"email": "[email protected]",
9+
"url": "https://github.com/dangreen"
10+
},
11+
"license": "MIT",
12+
"homepage": "https://github.com/TrigenSoftware/simple-libs/tree/master/packages/stream-utils#readme",
13+
"funding": "https://ko-fi.com/dangreen",
14+
"repository": {
15+
"type": "git",
16+
"url": "https://github.com/TrigenSoftware/simple-libs.git",
17+
"directory": "packages/stream-utils"
18+
},
19+
"bugs": {
20+
"url": "https://github.com/TrigenSoftware/simple-libs/issues"
21+
},
22+
"keywords": [
23+
"stream",
24+
"streams",
25+
"utilities",
26+
"utils"
27+
],
28+
"engines": {
29+
"node": ">=18"
30+
},
31+
"exports": "./src/index.ts",
32+
"publishConfig": {
33+
"exports": {
34+
"types": "./dist/index.d.ts",
35+
"import": "./dist/index.js"
36+
},
37+
"directory": "package",
38+
"linkDirectory": false
39+
},
40+
"files": [
41+
"dist"
42+
],
43+
"scripts": {
44+
"clear:package": "del ./package",
45+
"clear:dist": "del ./dist",
46+
"clear": "del ./package ./dist ./coverage",
47+
"prepublishOnly": "run build clear:package clean-publish",
48+
"postpublish": "pnpm clear:package",
49+
"build": "tsc -p tsconfig.build.json",
50+
"lint": "eslint --parser-options tsconfigRootDir:. '**/*.{js,ts}'",
51+
"test:unit": "vitest run --coverage",
52+
"test:types": "tsc --noEmit",
53+
"test": "run -p lint test:unit test:types"
54+
},
55+
"dependencies": {
56+
"@types/node": "^22.0.0"
57+
}
58+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { spawn } from 'child_process'
2+
import {
3+
describe,
4+
it,
5+
expect
6+
} from 'vitest'
7+
import {
8+
toArray,
9+
mergeReadables
10+
} from './index.js'
11+
12+
function stream(id: string, time: number) {
13+
// eslint-disable-next-line no-template-curly-in-string, prefer-template
14+
const child = spawn('node', ['-e', '(async () => { for (let i = 0; i < 10; i++) { await new Promise(r => setTimeout(r, ' + time + ')); console.log(`' + id + '${i}`) }})()'])
15+
16+
return child.stdout
17+
}
18+
19+
describe('stream-utils', () => {
20+
describe('mergeReadables', () => {
21+
it('should merge multiple readable streams', async () => {
22+
const stream1 = stream('a', 10)
23+
const stream2 = stream('b', 25)
24+
const merged = mergeReadables({
25+
a: stream1,
26+
b: stream2
27+
})
28+
const result = await toArray(merged)
29+
30+
expect(result).toHaveLength(20)
31+
})
32+
})
33+
})

packages/stream-utils/src/index.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { Readable } from 'stream'
2+
3+
/**
4+
* Get all items from an async iterable and return them as an array.
5+
* @param iterable
6+
* @returns A promise that resolves to an array of items.
7+
*/
8+
export async function toArray<T>(iterable: AsyncIterable<T>): Promise<T[]> {
9+
const result: T[] = []
10+
11+
for await (const item of iterable) {
12+
result.push(item)
13+
}
14+
15+
return result
16+
}
17+
18+
/**
19+
* Concatenate all buffers from an async iterable into a single Buffer.
20+
* @param iterable
21+
* @returns A promise that resolves to a single Buffer containing all concatenated buffers.
22+
*/
23+
export async function concatBufferStream(iterable: AsyncIterable<Buffer>) {
24+
return Buffer.concat(await toArray(iterable))
25+
}
26+
27+
/**
28+
* Concatenate all strings from an async iterable into a single string.
29+
* @param iterable
30+
* @returns A promise that resolves to a single string containing all concatenated strings.
31+
*/
32+
export async function concatStringStream(iterable: AsyncIterable<string>) {
33+
return (await toArray(iterable)).join('')
34+
}
35+
36+
/**
37+
* Get the first item from an async iterable.
38+
* @param stream
39+
* @returns A promise that resolves to the first item, or null if the iterable is empty.
40+
*/
41+
export async function firstFromStream<T>(stream: AsyncIterable<T>) {
42+
for await (const tag of stream) {
43+
return tag
44+
}
45+
46+
return null
47+
}
48+
49+
export interface MergedReadableChunk<K extends string, T = Buffer> {
50+
source: K
51+
chunk: T
52+
}
53+
54+
/**
55+
* Merges multiple Readable streams into a single Readable stream.
56+
* Each chunk will be an object containing the source stream name and the chunk data.
57+
* @param streams - An object where keys are stream names and values are Readable streams.
58+
* @returns A merged Readable stream.
59+
*/
60+
export function mergeReadables<
61+
K extends string,
62+
T = Buffer
63+
>(
64+
streams: Record<K, Readable>
65+
): Readable & AsyncIterable<MergedReadableChunk<K, T>> {
66+
const mergedStream = new Readable({
67+
objectMode: true,
68+
read() {
69+
/* no-op */
70+
}
71+
})
72+
let ended = 0
73+
74+
Object.entries(streams as Record<string, Readable>).forEach(([name, stream], _i, entries) => {
75+
stream
76+
.on('data', (chunk: Buffer) => mergedStream.push({
77+
source: name,
78+
chunk
79+
}))
80+
.on('end', () => {
81+
ended += 1
82+
83+
if (ended === entries.length) {
84+
mergedStream.push(null)
85+
}
86+
})
87+
.on('error', err => mergedStream.destroy(err))
88+
})
89+
90+
return mergedStream
91+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"extends": "../../tsconfig.json",
3+
"compilerOptions": {
4+
"outDir": "dist"
5+
},
6+
"include": [
7+
"src"
8+
],
9+
"exclude": [
10+
"**/*.spec.ts"
11+
]
12+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"extends": "./tsconfig.build.json",
3+
"include": [
4+
"src"
5+
],
6+
"exclude": []
7+
}

pnpm-lock.yaml

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)