Skip to content

Commit a6c2fce

Browse files
committed
stream: add diagnostics channel for tracking chunks added to a readable stream
Signed-off-by: Darshan Sen <[email protected]>
1 parent c8ca16c commit a6c2fce

File tree

3 files changed

+41
-15
lines changed

3 files changed

+41
-15
lines changed

doc/api/diagnostics_channel.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,17 @@ Emitted when [`net.Server.listen()`][] is returning an error.
13991399

14001400
Emitted when a new UDP socket is created.
14011401

1402+
#### Stream
1403+
1404+
> Stability: 1 - Experimental
1405+
1406+
##### Event: `'stream.readable.addChunk'`
1407+
1408+
* `stream` {stream.Readable}
1409+
* `chunk` {Buffer}
1410+
1411+
Emitted when a chunk is added to a readable stream.
1412+
14021413
#### Process
14031414

14041415
> Stability: 1 - Experimental

lib/internal/inspector/network_http2.js

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -180,22 +180,26 @@ function onClientStreamFinish({ stream, headers }) {
180180
charset,
181181
},
182182
});
183+
}
184+
185+
/**
186+
* When a chunk of the response body has been received, cache it until `getResponseBody` request
187+
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody or
188+
* stream it with `streamResourceContent` request.
189+
* https://chromedevtools.github.io/devtools-protocol/tot/Network/#method-streamResourceContent
190+
* @param {{ stream: import('http2').ClientHttp2Stream, chunk: Buffer }} event
191+
*/
192+
function onStreamReadableAddChunk({ stream, chunk }) {
193+
if (typeof stream[kInspectorRequestId] !== 'string') {
194+
return;
195+
}
183196

184-
stream.on('data', (chunk) => {
185-
/**
186-
* When a chunk of the response body has been received, cache it until `getResponseBody` request
187-
* https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody or
188-
* stream it with `streamResourceContent` request.
189-
* https://chromedevtools.github.io/devtools-protocol/tot/Network/#method-streamResourceContent
190-
*/
191-
192-
Network.dataReceived({
193-
requestId: stream[kInspectorRequestId],
194-
timestamp: getMonotonicTime(),
195-
dataLength: chunk.byteLength,
196-
encodedDataLength: chunk.byteLength,
197-
data: chunk,
198-
});
197+
Network.dataReceived({
198+
requestId: stream[kInspectorRequestId],
199+
timestamp: getMonotonicTime(),
200+
dataLength: chunk.byteLength,
201+
encodedDataLength: chunk.byteLength,
202+
data: chunk,
199203
});
200204
}
201205

@@ -228,4 +232,5 @@ module.exports = registerDiagnosticChannels([
228232
['http2.client.stream.close', onClientStreamClose],
229233
['http2.client.stream.bodyChunkSent', onClientStreamBodyChunkSent],
230234
['http2.client.stream.bodySent', onClientStreamBodySent],
235+
['stream.readable.addChunk', onStreamReadableAddChunk],
231236
]);

lib/internal/streams/readable.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ const FastBuffer = Buffer[SymbolSpecies];
9292
const { StringDecoder } = require('string_decoder');
9393
const from = require('internal/streams/from');
9494

95+
const dc = require('diagnostics_channel');
96+
const onAddChunkChannel = dc.channel('stream.readable.addChunk');
97+
9598
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
9699
ObjectSetPrototypeOf(Readable, Stream);
97100
const nop = () => {};
@@ -546,6 +549,13 @@ function canPushMore(state) {
546549
}
547550

548551
function addChunk(stream, state, chunk, addToFront) {
552+
if (onAddChunkChannel.hasSubscribers) {
553+
onAddChunkChannel.publish({
554+
stream,
555+
chunk,
556+
});
557+
}
558+
549559
if ((state[kState] & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) {
550560
// Use the guard to avoid creating `Set()` repeatedly
551561
// when we have multiple pipes.

0 commit comments

Comments
 (0)