Skip to content

Commit 939ecff

Browse files
committed
http2: add diagnostics channels for client stream request body
These would allow inspection of HTTP/2 client stream request bodies. Signed-off-by: Darshan Sen <[email protected]>
1 parent ba7cdf4 commit 939ecff

File tree

3 files changed

+109
-0
lines changed

3 files changed

+109
-0
lines changed

doc/api/diagnostics_channel.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,6 +1243,19 @@ Emitted when an error occurs during the processing of a stream on the client.
12431243

12441244
Emitted when a stream is received on the client.
12451245

1246+
##### Event: `'http2.client.stream.bodyChunkSent'`
1247+
1248+
* `stream` {ClientHttp2Stream}
1249+
* `chunk` {Buffer}
1250+
1251+
Emitted when a chunk of the client stream body is being sent.
1252+
1253+
##### Event: `'http2.client.stream.bodySent'`
1254+
1255+
* `stream` {ClientHttp2Stream}
1256+
1257+
Emitted after the client stream body has been fully sent.
1258+
12461259
##### Event: `'http2.client.stream.close'`
12471260

12481261
* `stream` {ClientHttp2Stream}

lib/internal/http2/core.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,14 @@ const { UV_EOF } = internalBinding('uv');
186186
const { StreamPipe } = internalBinding('stream_pipe');
187187
const { _connectionListener: httpConnectionListener } = http;
188188

189+
const { Buffer } = require('buffer');
190+
189191
const dc = require('diagnostics_channel');
190192
const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created');
191193
const onClientStreamStartChannel = dc.channel('http2.client.stream.start');
192194
const onClientStreamErrorChannel = dc.channel('http2.client.stream.error');
195+
const onClientStreamBodyChunkSentChannel = dc.channel('http2.client.stream.bodyChunkSent');
196+
const onClientStreamBodySentChannel = dc.channel('http2.client.stream.bodySent');
193197
const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish');
194198
const onClientStreamCloseChannel = dc.channel('http2.client.stream.close');
195199
const onServerStreamCreatedChannel = dc.channel('http2.server.stream.created');
@@ -2300,6 +2304,34 @@ class Http2Stream extends Duplex {
23002304
req = writeGeneric(this, data, encoding, writeCallback);
23012305

23022306
trackWriteState(this, req.bytes);
2307+
2308+
if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodyChunkSentChannel.hasSubscribers) {
2309+
let chunk;
2310+
2311+
if (ArrayIsArray(data)) {
2312+
const buffers = [];
2313+
for (let i = 0; i < data.length; ++i) {
2314+
if (typeof data[i] === 'object') {
2315+
if (typeof data[i].chunk === 'string') {
2316+
buffers.push(Buffer.from(data[i].chunk, data[i].encoding));
2317+
} else {
2318+
buffers.push(data[i].chunk);
2319+
}
2320+
}
2321+
}
2322+
2323+
chunk = Buffer.concat(buffers);
2324+
} else if (typeof data === 'string') {
2325+
chunk = Buffer.from(data);
2326+
} else {
2327+
chunk = data;
2328+
}
2329+
2330+
onClientStreamBodyChunkSentChannel.publish({
2331+
stream: this,
2332+
chunk,
2333+
});
2334+
}
23032335
}
23042336

23052337
_write(data, encoding, cb) {
@@ -2317,6 +2349,10 @@ class Http2Stream extends Duplex {
23172349
}
23182350
debugStreamObj(this, 'shutting down writable on _final');
23192351
ReflectApply(shutdownWritable, this, [cb]);
2352+
2353+
if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodySentChannel.hasSubscribers) {
2354+
onClientStreamBodySentChannel.publish({ stream: this });
2355+
}
23202356
}
23212357

23222358
_read(nread) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
7+
// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
8+
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
9+
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
10+
// being sent.
11+
12+
const assert = require('assert');
13+
const dc = require('diagnostics_channel');
14+
const http2 = require('http2');
15+
const { Duplex } = require('stream');
16+
17+
let bodyChunkSent = false;
18+
19+
dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, chunk }) => {
20+
// Since ClientHttp2Stream is not exported from any module, this just checks
21+
// if the stream is an instance of Duplex.
22+
assert.ok(stream instanceof Duplex);
23+
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
24+
25+
assert.strictEqual(Buffer.isBuffer(chunk), true);
26+
assert.strictEqual(chunk.toString(), 'foobarbaz');
27+
28+
bodyChunkSent = true;
29+
}));
30+
31+
dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
32+
// 'http2.client.stream.bodyChunkSent' must run first.
33+
assert.ok(bodyChunkSent);
34+
35+
// Since ClientHttp2Stream is not exported from any module, this just checks
36+
// if the stream is an instance of Duplex.
37+
assert.ok(stream instanceof Duplex);
38+
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
39+
}));
40+
41+
const server = http2.createServer();
42+
server.on('stream', common.mustCall((stream) => {
43+
stream.respond({}, { endStream: true });
44+
}));
45+
46+
server.listen(0, common.mustCall(() => {
47+
const port = server.address().port;
48+
const client = http2.connect(`http://localhost:${port}`);
49+
50+
const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
51+
stream.write('foo');
52+
stream.write(Buffer.from('bar'));
53+
stream.write(new TextEncoder().encode('baz'));
54+
stream.end();
55+
56+
stream.on('response', common.mustCall(() => {
57+
client.close();
58+
server.close();
59+
}));
60+
}, 1));

0 commit comments

Comments
 (0)