Skip to content

Commit 4fe325d

Browse files
authored
stream: preserve AsyncLocalStorage on finished only when needed
PR-URL: nodejs/node#59873 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Daniel Lemire <[email protected]> Reviewed-By: Yagiz Nizipli <[email protected]>
1 parent 6176222 commit 4fe325d

File tree

5 files changed

+124
-3
lines changed

5 files changed

+124
-3
lines changed

benchmark/streams/finished.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable, Writable } = require('stream');
5+
const { finished } = require('stream/promises');
6+
7+
const bench = common.createBenchmark(main, {
8+
n: [1e7],
9+
streamType: ['readable', 'writable'],
10+
});
11+
12+
async function main({ n, streamType }) {
13+
bench.start();
14+
15+
for (let i = 0; i < n; i++) {
16+
let stream;
17+
18+
switch (streamType) {
19+
case 'readable':
20+
stream = new Readable({ read() { this.push(null); } });
21+
stream.resume();
22+
break;
23+
case 'writable':
24+
stream = new Writable({ write(chunk, enc, cb) { cb(); } });
25+
stream.end();
26+
break;
27+
}
28+
29+
await finished(stream);
30+
}
31+
32+
bench.end(n);
33+
}

lib/internal/streams/end-of-stream.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const {
4444
kIsClosedPromise,
4545
} = require('internal/streams/utils');
4646

47+
const { getHookArrays } = require('internal/async_hooks');
48+
const AsyncContextFrame = require('internal/async_context_frame');
49+
4750
// Lazy load
4851
let AsyncResource;
4952
let addAbortListener;
@@ -74,9 +77,14 @@ function eos(stream, options, callback) {
7477
validateFunction(callback, 'callback');
7578
validateAbortSignal(options.signal, 'options.signal');
7679

77-
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
78-
// is a bottleneck here.
79-
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
80+
if (AsyncContextFrame.current() ||
81+
getHookArrays()[0].length > 0) {
82+
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
83+
// is a bottleneck here.
84+
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
85+
} else {
86+
callback = once(callback);
87+
}
8088

8189
if (isReadableStream(stream) || isWritableStream(stream)) {
8290
return eosWeb(stream, options, callback);
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { AsyncLocalStorage } = require('async_hooks');
7+
const { strictEqual } = require('assert');
8+
const AsyncContextFrame = require('internal/async_context_frame');
9+
const internalAsyncHooks = require('internal/async_hooks');
10+
11+
// This test verifies that ALS context is preserved when using stream.finished()
12+
13+
const als = new AsyncLocalStorage();
14+
const readable = new Readable();
15+
16+
als.run('test-context-1', () => {
17+
finished(readable, common.mustCall(() => {
18+
strictEqual(AsyncContextFrame.enabled || internalAsyncHooks.getHookArrays()[0].length > 0,
19+
true, 'One of AsyncContextFrame or async hooks criteria should be met');
20+
strictEqual(als.getStore(), 'test-context-1', 'ALS context should be preserved');
21+
}));
22+
});
23+
24+
readable.destroy();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { createHook, executionAsyncId } = require('async_hooks');
7+
const { strictEqual } = require('assert');
8+
const internalAsyncHooks = require('internal/async_hooks');
9+
10+
// This test verifies that when there are active async hooks, stream.finished() uses
11+
// the bindAsyncResource path
12+
13+
createHook({
14+
init(asyncId, type, triggerAsyncId) {
15+
if (type === 'STREAM_END_OF_STREAM') {
16+
const parentContext = contextMap.get(triggerAsyncId);
17+
contextMap.set(asyncId, parentContext);
18+
}
19+
}
20+
}).enable();
21+
22+
const contextMap = new Map();
23+
const asyncId = executionAsyncId();
24+
contextMap.set(asyncId, 'abc-123');
25+
const readable = new Readable();
26+
27+
finished(readable, common.mustCall(() => {
28+
const currentAsyncId = executionAsyncId();
29+
const ctx = contextMap.get(currentAsyncId);
30+
strictEqual(internalAsyncHooks.getHookArrays()[0].length > 0,
31+
true, 'Should have active user async hook');
32+
strictEqual(ctx, 'abc-123', 'Context should be preserved');
33+
}));
34+
35+
readable.destroy();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Flags: --expose-internals --no-async-context-frame
2+
'use strict';
3+
4+
const common = require('../common');
5+
const { Readable, finished } = require('stream');
6+
const { strictEqual } = require('assert');
7+
const AsyncContextFrame = require('internal/async_context_frame');
8+
const internalAsyncHooks = require('internal/async_hooks');
9+
10+
// This test verifies that when there are no active async hooks, stream.finished() uses the default callback path
11+
12+
const readable = new Readable();
13+
14+
finished(readable, common.mustCall(() => {
15+
strictEqual(internalAsyncHooks.getHookArrays()[0].length === 0,
16+
true, 'Should not have active user async hook');
17+
strictEqual(AsyncContextFrame.current() || internalAsyncHooks.getHookArrays()[0].length > 0,
18+
false, 'Default callback path should be used');
19+
}));
20+
21+
readable.destroy();

0 commit comments

Comments
 (0)