Skip to content

Commit a2a9a4b

Browse files
author
Stephen Belanger
committed
1.0.0
0 parents  commit a2a9a4b

File tree

5 files changed

+221
-0
lines changed

5 files changed

+221
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
node_modules
2+
package-lock.json

README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# async-iterator-pipe
2+
3+
Pipe between async iterators and streams.
4+
5+
## Install
6+
7+
```sh
8+
npm install async-iterator-pipe
9+
```
10+
11+
## API
12+
13+
### `pipe(source, ...stages)`
14+
15+
The `source` should be an async iterable. Node.js readable
16+
streams have experimental support for async iterators, so
17+
those work too. The `stages` should be a sequence of streams
18+
or functions that accept an async iterator and return a new
19+
async iterator to transform or receive the values from the
20+
source or previous stage in the sequence.
21+
22+
The final stage in the sequence will be returned, much like
23+
`stream.pipe(target)` returns the `target` on Node.js streams.
24+
25+
```js
26+
const pipe = require('async-iterator-pipe')
27+
const http = require('http')
28+
29+
async function* upper (iterator) {
30+
for await (let chunk of iterator) {
31+
yield chunk.toString().toUpperCase()
32+
}
33+
}
34+
35+
const server = http.createServer(async (req, res) => {
36+
try {
37+
req.setEncoding('utf8')
38+
await pipe(req, upper, res)
39+
} catch (err) {
40+
res.writeHead(500, {
41+
'content-type': 'text/plain'
42+
})
43+
res.end(err.message)
44+
}
45+
})
46+
47+
server.listen(3000)
48+
```
49+
50+
---
51+
52+
### Copyright (c) 2019 Stephen Belanger
53+
54+
#### Licensed under MIT License
55+
56+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
57+
58+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
59+
60+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

index.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
const assert = require('assert')
2+
3+
function noop () {}
4+
5+
function isFunction (value) {
6+
return typeof value === 'function'
7+
}
8+
9+
function isStream (value) {
10+
return isFunction(value.write) && isFunction(value.end)
11+
}
12+
13+
async function pipeStream (iterator, stream) {
14+
for await (let chunk of iterator) {
15+
stream.write(chunk)
16+
}
17+
stream.end()
18+
}
19+
20+
function pipe (source, target) {
21+
assert(
22+
isFunction(source[Symbol.asyncIterator]),
23+
'source should be an async iterable'
24+
)
25+
26+
if (isStream(target)) {
27+
pipeStream(source, target).catch(noop)
28+
return target
29+
} else if (isFunction(target)) {
30+
return target(source)
31+
} else {
32+
throw new Error('Unrecognized target type')
33+
}
34+
}
35+
36+
function pipeThrough (source, ...rest) {
37+
return rest.reduce(pipe, source)
38+
}
39+
40+
module.exports = pipeThrough

package.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "async-iterator-pipe",
3+
"version": "1.0.0",
4+
"description": "Pipe between async iterators and streams",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "tap test.js"
8+
},
9+
"keywords": [
10+
"async",
11+
"iterator",
12+
"stream",
13+
"pipe"
14+
],
15+
"author": "Stephen Belanger <[email protected]> (https://github.com/qard)",
16+
"license": "MIT",
17+
"devDependencies": {
18+
"tap": "^12.5.2"
19+
}
20+
}

test.js

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
const { PassThrough } = require('stream')
2+
3+
const tap = require('tap')
4+
5+
const pipe = require('./')
6+
7+
async function* produceIterator (n) {
8+
for (let i = 0; i < n; i++) {
9+
yield `produced "${i}"\n`
10+
}
11+
}
12+
13+
function produceStream (n) {
14+
const stream = new PassThrough()
15+
stream.setEncoding('utf8')
16+
for (let i = 0; i < n; i++) {
17+
stream.write(`produced "${i}"\n`)
18+
}
19+
stream.end()
20+
return stream
21+
}
22+
23+
async function* splitLines (iterator) {
24+
let buffer = ''
25+
for await (let item of iterator) {
26+
buffer += item
27+
let position = buffer.indexOf('\n')
28+
while (position >= 0) {
29+
yield buffer.slice(0, position + 1)
30+
buffer = buffer.slice(position + 1)
31+
position = buffer.indexOf('\n')
32+
}
33+
}
34+
}
35+
36+
tap.test('iterator -> stream', t => {
37+
t.plan(5)
38+
39+
const stream = new PassThrough()
40+
stream.setEncoding('utf8')
41+
let i = 0
42+
43+
stream.on('data', chunk => {
44+
t.equal(chunk, `produced "${i++}"\n`)
45+
})
46+
47+
pipe(produceIterator(5), stream)
48+
})
49+
50+
tap.test('stream -> iterator', async t => {
51+
t.plan(5)
52+
53+
let i = 0
54+
55+
for await (let chunk of pipe(produceStream(5), splitLines)) {
56+
t.equal(chunk, `produced "${i++}"\n`)
57+
}
58+
})
59+
60+
tap.test('iterator -> iterator', async t => {
61+
t.plan(5)
62+
63+
async function* upper (iterator) {
64+
for await (let item of iterator) {
65+
yield item.toString().toUpperCase()
66+
}
67+
}
68+
69+
let i = 0
70+
71+
for await (let chunk of pipe(produceIterator(5), upper)) {
72+
t.equal(chunk, `PRODUCED "${i++}"\n`)
73+
}
74+
})
75+
76+
tap.test('stream -> iterator -> stream', t => {
77+
t.plan(5)
78+
79+
const stream = new PassThrough()
80+
stream.setEncoding('utf8')
81+
let i = 0
82+
83+
stream.on('data', chunk => {
84+
t.equal(chunk, `produced "${i++}"\n`)
85+
})
86+
87+
pipe(produceStream(5), splitLines, stream)
88+
})
89+
90+
tap.test('iterator -> stream -> iterator', async t => {
91+
t.plan(5)
92+
93+
const stream = new PassThrough()
94+
let i = 0
95+
96+
for await (let chunk of pipe(produceIterator(5), stream, splitLines)) {
97+
t.equal(chunk, `produced "${i++}"\n`)
98+
}
99+
})

0 commit comments

Comments
 (0)