Skip to content

Commit a803605

Browse files
committed
Use chunk utility on streams.
1 parent ae4ebbb commit a803605

File tree

5 files changed

+69
-28
lines changed

5 files changed

+69
-28
lines changed

package.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
"main": "./lib/index.js",
2121
"module": "./lib/index.js",
2222
"exports": {
23-
"types": "./@type/index.d.ts",
24-
"default": "./lib/index.js"
23+
"./package.json": "./package.json",
24+
".": {
25+
"types": "./@type/index.d.ts",
26+
"import": "./lib/index.js",
27+
"default": "./lib/index.js"
28+
}
2529
},
2630
"types": "./@type/index.d.ts",
2731
"scripts": {

src/util/chunk.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
export const MAX_CHUNK_SIZE = 65536
22

3+
/**
4+
* Chunks given `value` into evenly sized pieces (Same as `MAX_CHUNK_SIZE` bytes per each).
5+
* Returns `Generator<Uint8Array>` allowing to iterate over these chunks.
6+
*
7+
* If value is less then `MAX_CHUNK_SIZE`, it will be returned as-is.
8+
*
9+
* If the last chunk is less than `MAX_CHUNK_SIZE`, then returned value will be the size that chunk.
10+
*
11+
* @param value A value to chunk into evenly sized pieces
12+
*
13+
* @api private
14+
*/
315
export function* chunk(value: Uint8Array): Generator<Uint8Array, void> {
416
if (value.byteLength <= MAX_CHUNK_SIZE) {
517
yield value

src/util/getStreamIterator.test.ts

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,44 @@ import {ReadableStream} from "web-streams-polyfill"
44
import {stub} from "sinon"
55

66
import {getStreamIterator} from "./getStreamIterator.js"
7+
import {isAsyncIterable} from "./isAsyncIterable.js"
78

8-
test(
9-
"Returns readable stream as is, if it implements Symbol.asyncIterator",
9+
test("Returns async iterable for streams w/ Symbol.asyncIterator", t => {
10+
const stream = new ReadableStream()
1011

11-
t => {
12-
const stream = new ReadableStream()
12+
t.true(isAsyncIterable(getStreamIterator(stream)))
13+
})
14+
15+
test("Iterates over given stream", async t => {
16+
const expected = "Some text"
1317

14-
t.is(getStreamIterator(stream), stream)
18+
const stream = new ReadableStream({
19+
pull(controller) {
20+
controller.enqueue(new TextEncoder().encode(expected))
21+
controller.close()
22+
}
23+
})
24+
25+
let actual = ""
26+
const decoder = new TextDecoder()
27+
for await (const chunk of getStreamIterator(stream)) {
28+
actual += decoder.decode(chunk, {stream: true})
1529
}
16-
)
1730

18-
test(
19-
"Returns fallback when given stream does not implement Symbol.asyncIterator",
31+
actual += decoder.decode()
32+
33+
t.is(actual, expected)
34+
})
2035

21-
t => {
22-
const stream = new ReadableStream()
36+
test("Returns async iterable for streams w/o Symbol.asyncIterator", t => {
37+
const stream = new ReadableStream()
2338

24-
stub(stream, Symbol.asyncIterator).get(() => undefined)
39+
stub(stream, Symbol.asyncIterator).get(() => undefined)
2540

26-
t.false(getStreamIterator(stream) instanceof ReadableStream)
27-
}
28-
)
41+
t.false(getStreamIterator(stream) instanceof ReadableStream)
42+
})
2943

30-
test("Reads from the stream using fallback", async t => {
44+
test("Iterates over the stream using fallback", async t => {
3145
const expected = "Some text"
3246

3347
const stream = new ReadableStream({

src/util/getStreamIterator.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
1+
import {isAsyncIterable} from "./isAsyncIterable.js"
12
import {isFunction} from "./isFunction.js"
2-
3-
/**
4-
* Checks if the value is async iterable
5-
*/
6-
const isAsyncIterable = (
7-
value: unknown
8-
): value is AsyncIterable<Uint8Array> => (
9-
isFunction((value as AsyncIterable<Uint8Array>)[Symbol.asyncIterator])
10-
)
3+
import {chunk} from "./chunk.js"
114

125
/**
136
* Reads from given ReadableStream
@@ -30,6 +23,14 @@ async function* readStream(
3023
}
3124
}
3225

26+
async function* chunkStream(
27+
stream: AsyncIterable<Uint8Array>
28+
): AsyncGenerator<Uint8Array, void> {
29+
for await (const value of stream) {
30+
yield* chunk(value)
31+
}
32+
}
33+
3334
/**
3435
* Turns ReadableStream into async iterable when the `Symbol.asyncIterable` is not implemented on given stream.
3536
*
@@ -39,11 +40,11 @@ export const getStreamIterator = (
3940
source: ReadableStream<Uint8Array> | AsyncIterable<Uint8Array>
4041
): AsyncIterable<Uint8Array> => {
4142
if (isAsyncIterable(source)) {
42-
return source
43+
return chunkStream(source)
4344
}
4445

4546
if (isFunction(source.getReader)) {
46-
return readStream(source)
47+
return chunkStream(readStream(source))
4748
}
4849

4950
// Throw an error otherwise (for example, in case if encountered Node.js Readable stream without Symbol.asyncIterator method)

src/util/isAsyncIterable.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import {isFunction} from "./isFunction.js"
2+
3+
/**
4+
* Checks if the value is async iterable
5+
*/
6+
export const isAsyncIterable = (
7+
value: object
8+
): value is AsyncIterable<Uint8Array> => (
9+
isFunction((value as AsyncIterable<Uint8Array>)[Symbol.asyncIterator])
10+
)

0 commit comments

Comments
 (0)