Skip to content

Commit 90c1618

Browse files
committed
stream: readable read one buffer at a time
Instead of wasting cycles concatenating buffers, just return each one by one.
1 parent 6176222 commit 90c1618

9 files changed

+55
-10
lines changed

lib/internal/streams/readable.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,13 @@ function howMuchToRead(n, state) {
631631
if ((state[kState] & kObjectMode) !== 0)
632632
return 1;
633633
if (NumberIsNaN(n)) {
634+
// Fast path for buffers.
635+
if ((state[kState] & kDecoder) === 0) {
636+
return state.length
637+
? state.buffer[state.bufferIndex].length
638+
: state.length;
639+
}
640+
634641
// Only flow one buffer at a time.
635642
if ((state[kState] & kFlowing) !== 0 && state.length)
636643
return state.buffer[state.bufferIndex].length;

test/parallel/test-stream-compose.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ const assert = require('assert');
490490

491491
newStream.end();
492492

493-
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
493+
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve Rogers'), Buffer.from('On your left')]);
494494
})().then(common.mustCall());
495495
}
496496

test/parallel/test-stream-push-strings.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ms.on('readable', function() {
5959
results.push(String(chunk));
6060
});
6161

62-
const expect = [ 'first chunksecond to last chunk', 'last chunk' ];
62+
const expect = [ 'first chunk', 'second to last chunk', 'last chunk' ];
6363
process.on('exit', function() {
6464
assert.strictEqual(ms._chunks, -1);
6565
assert.deepStrictEqual(results, expect);

test/parallel/test-stream-readable-emittedReadable.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const readable = new Readable({
1010
// Initialized to false.
1111
assert.strictEqual(readable._readableState.emittedReadable, false);
1212

13-
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
13+
const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('quo'), null];
1414
readable.on('readable', common.mustCall(() => {
1515
// emittedReadable should be true when the readable event is emitted
1616
assert.strictEqual(readable._readableState.emittedReadable, true);

test/parallel/test-stream-readable-infinite-read.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ readable.on('readable', common.mustCall(function() {
2626
// TODO(mcollina): there is something odd in the highWaterMark logic
2727
// investigate.
2828
if (i === 1) {
29-
assert.strictEqual(data.length, 8192 * 2);
29+
assert.strictEqual(data.length, 8192);
3030
} else {
31-
assert.strictEqual(data.length, 8192 * 3);
31+
assert.strictEqual(data.length, 8192);
3232
}
3333
}, 11));
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
5+
const { Readable } = require('stream');
6+
7+
// Read one buffer at a time and don't waste cycles allocating
8+
// and copying into a new larger buffer.
9+
{
10+
const r = new Readable({
11+
read() {}
12+
});
13+
const buffers = [Buffer.allocUnsafe(5), Buffer.allocUnsafe(10)];
14+
for (const buf of buffers) {
15+
r.push(buf);
16+
}
17+
for (const buf of buffers) {
18+
assert.strictEqual(r.read(), buf);
19+
}
20+
}

test/parallel/test-stream-typedarray.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,19 @@ const views = common.getArrayBufferViews(buffer);
8383
readable.push(views[2]);
8484
readable.unshift(views[0]);
8585

86-
const buf = readable.read();
86+
let buf;
87+
88+
buf = readable.read();
89+
assert(buf instanceof Buffer);
90+
assert.deepStrictEqual([...buf], [...views[0]]);
91+
92+
buf = readable.read();
93+
assert(buf instanceof Buffer);
94+
assert.deepStrictEqual([...buf], [...views[1]]);
95+
96+
buf = readable.read();
8797
assert(buf instanceof Buffer);
88-
assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]);
98+
assert.deepStrictEqual([...buf], [...views[2]]);
8999
}
90100

91101
{

test/parallel/test-stream-uint8array.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,15 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]);
8080
readable.push(DEF);
8181
readable.unshift(ABC);
8282

83-
const buf = readable.read();
83+
let buf;
84+
85+
buf = readable.read();
86+
assert(buf instanceof Buffer);
87+
assert.deepStrictEqual([...buf], [...ABC]);
88+
89+
buf = readable.read();
8490
assert(buf instanceof Buffer);
85-
assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
91+
assert.deepStrictEqual([...buf], [...DEF]);
8692
}
8793

8894
{

test/parallel/test-stream2-transform.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,9 @@ const { PassThrough, Transform } = require('stream');
282282
pt.write(Buffer.from('ef'), common.mustCall(function() {
283283
pt.end();
284284
}));
285-
assert.strictEqual(pt.read().toString(), 'abcdef');
285+
assert.strictEqual(pt.read().toString(), 'abc');
286+
assert.strictEqual(pt.read().toString(), 'd');
287+
assert.strictEqual(pt.read().toString(), 'ef');
286288
assert.strictEqual(pt.read(), null);
287289
});
288290
});

0 commit comments

Comments
 (0)