Skip to content

Commit 1760239

Browse files
committed
stream: add diagnostics for readable stream chunk additions
Signed-off-by: Darshan Sen <[email protected]>
1 parent ba7cdf4 commit 1760239

File tree

3 files changed

+77
-0
lines changed

3 files changed

+77
-0
lines changed

doc/api/diagnostics_channel.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,16 @@ added: v16.18.0
14001400

14011401
Emitted when a new process is created.
14021402

1403+
#### Stream
1404+
1405+
> Stability: 1 - Experimental
1406+
##### Event: `'stream.readable.addChunk'`
1407+
1408+
* `stream` {stream.Readable}
1409+
* `chunk` {Buffer}
1410+
1411+
Emitted when a chunk is added to a readable stream.
1412+
14031413
##### Event: `'execve'`
14041414

14051415
* `execPath` {string}

lib/internal/streams/readable.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ const FastBuffer = Buffer[SymbolSpecies];
9292
const { StringDecoder } = require('string_decoder');
9393
const from = require('internal/streams/from');
9494

95+
const dc = require('diagnostics_channel');
96+
const onAddChunkChannel = dc.channel('stream.readable.addChunk');
97+
9598
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
9699
ObjectSetPrototypeOf(Readable, Stream);
97100
const nop = () => {};
@@ -546,6 +549,13 @@ function canPushMore(state) {
546549
}
547550

548551
function addChunk(stream, state, chunk, addToFront) {
552+
if (onAddChunkChannel.hasSubscribers) {
553+
onAddChunkChannel.publish({
554+
stream,
555+
chunk,
556+
});
557+
}
558+
549559
if ((state[kState] & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) {
550560
// Use the guard to avoid creating `Set()` repeatedly
551561
// when we have multiple pipes.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
7+
// This test ensures that the built-in Stream diagnostics channels are reporting
8+
// the diagnostics messages for the 'stream.readable.addChunk' channel when
9+
// chunks are added to the readable stream by:
10+
// - readable.push()
11+
// - readable.unshift()
12+
13+
const assert = require('assert');
14+
const dc = require('diagnostics_channel');
15+
const { Readable } = require('stream');
16+
17+
const values = [
18+
'hello',
19+
'world',
20+
'🌍',
21+
];
22+
23+
const objects = [
24+
{ msg: values[0] },
25+
{ msg: values[1] },
26+
{ msg: values[2] },
27+
];
28+
29+
let i = 0;
30+
dc.subscribe('stream.readable.addChunk', common.mustCall(({ stream, chunk }) => {
31+
if (i < values.length) {
32+
assert.strictEqual(stream, readable);
33+
assert.strictEqual(chunk.toString(), values[i]);
34+
} else {
35+
assert.strictEqual(stream, objReadable);
36+
assert.strictEqual(chunk, objects[i - 3]);
37+
}
38+
++i;
39+
}, values.length * 2));
40+
41+
const readable = new Readable({
42+
read() {}
43+
});
44+
45+
readable.push(values[0]);
46+
readable.push(values[1]);
47+
readable.unshift(values[2]);
48+
readable.push(null);
49+
50+
const objReadable = new Readable({
51+
objectMode: true,
52+
read() {}
53+
});
54+
objReadable.push(objects[0]);
55+
objReadable.push(objects[1]);
56+
objReadable.unshift(objects[2]);
57+
objReadable.push(null);

0 commit comments

Comments
 (0)