stream: preserve AsyncLocalStorage on finished only when needed

PR-URL: https://github.com/nodejs/node/pull/59873
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Daniel Lemire <daniel@lemire.me>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
This commit is contained in:
avcribl 2025-10-27 12:23:34 -07:00 committed by GitHub
parent 617622211f
commit 4fe325d93d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 124 additions and 3 deletions

View File

@ -0,0 +1,33 @@
'use strict';
const common = require('../common');
const { Readable, Writable } = require('stream');
const { finished } = require('stream/promises');
const bench = common.createBenchmark(main, {
n: [1e7],
streamType: ['readable', 'writable'],
});
async function main({ n, streamType }) {
bench.start();
for (let i = 0; i < n; i++) {
let stream;
switch (streamType) {
case 'readable':
stream = new Readable({ read() { this.push(null); } });
stream.resume();
break;
case 'writable':
stream = new Writable({ write(chunk, enc, cb) { cb(); } });
stream.end();
break;
}
await finished(stream);
}
bench.end(n);
}

View File

@ -44,6 +44,9 @@ const {
kIsClosedPromise,
} = require('internal/streams/utils');
const { getHookArrays } = require('internal/async_hooks');
const AsyncContextFrame = require('internal/async_context_frame');
// Lazy load
let AsyncResource;
let addAbortListener;
@ -74,9 +77,14 @@ function eos(stream, options, callback) {
validateFunction(callback, 'callback');
validateAbortSignal(options.signal, 'options.signal');
if (AsyncContextFrame.current() ||
getHookArrays()[0].length > 0) {
// Avoid AsyncResource.bind() because it calls ObjectDefineProperties which
// is a bottleneck here.
callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM'));
} else {
callback = once(callback);
}
if (isReadableStream(stream) || isWritableStream(stream)) {
return eosWeb(stream, options, callback);

View File

@ -0,0 +1,24 @@
// Flags: --expose-internals
'use strict';
const common = require('../common');
const { Readable, finished } = require('stream');
const { AsyncLocalStorage } = require('async_hooks');
const { strictEqual } = require('assert');
const AsyncContextFrame = require('internal/async_context_frame');
const internalAsyncHooks = require('internal/async_hooks');
// This test verifies that ALS context is preserved when using stream.finished()
const als = new AsyncLocalStorage();
const readable = new Readable();
als.run('test-context-1', () => {
finished(readable, common.mustCall(() => {
strictEqual(AsyncContextFrame.enabled || internalAsyncHooks.getHookArrays()[0].length > 0,
true, 'One of AsyncContextFrame or async hooks criteria should be met');
strictEqual(als.getStore(), 'test-context-1', 'ALS context should be preserved');
}));
});
readable.destroy();

View File

@ -0,0 +1,35 @@
// Flags: --expose-internals
'use strict';
const common = require('../common');
const { Readable, finished } = require('stream');
const { createHook, executionAsyncId } = require('async_hooks');
const { strictEqual } = require('assert');
const internalAsyncHooks = require('internal/async_hooks');
// This test verifies that when there are active async hooks, stream.finished() uses
// the bindAsyncResource path
createHook({
init(asyncId, type, triggerAsyncId) {
if (type === 'STREAM_END_OF_STREAM') {
const parentContext = contextMap.get(triggerAsyncId);
contextMap.set(asyncId, parentContext);
}
}
}).enable();
const contextMap = new Map();
const asyncId = executionAsyncId();
contextMap.set(asyncId, 'abc-123');
const readable = new Readable();
finished(readable, common.mustCall(() => {
const currentAsyncId = executionAsyncId();
const ctx = contextMap.get(currentAsyncId);
strictEqual(internalAsyncHooks.getHookArrays()[0].length > 0,
true, 'Should have active user async hook');
strictEqual(ctx, 'abc-123', 'Context should be preserved');
}));
readable.destroy();

View File

@ -0,0 +1,21 @@
// Flags: --expose-internals --no-async-context-frame
'use strict';
const common = require('../common');
const { Readable, finished } = require('stream');
const { strictEqual } = require('assert');
const AsyncContextFrame = require('internal/async_context_frame');
const internalAsyncHooks = require('internal/async_hooks');
// This test verifies that when there are no active async hooks, stream.finished() uses the default callback path
const readable = new Readable();
finished(readable, common.mustCall(() => {
strictEqual(internalAsyncHooks.getHookArrays()[0].length === 0,
true, 'Should not have active user async hook');
strictEqual(AsyncContextFrame.current() || internalAsyncHooks.getHookArrays()[0].length > 0,
false, 'Default callback path should be used');
}));
readable.destroy();