From 4fe325d93de6eb58fcc74aee0e74ebdd1ea4e5b0 Mon Sep 17 00:00:00 2001 From: avcribl <129687137+avcribl@users.noreply.github.com> Date: Mon, 27 Oct 2025 12:23:34 -0700 Subject: [PATCH] stream: preserve AsyncLocalStorage on finished only when needed PR-URL: https://github.com/nodejs/node/pull/59873 Reviewed-By: James M Snell Reviewed-By: Matteo Collina Reviewed-By: Daniel Lemire Reviewed-By: Yagiz Nizipli --- benchmark/streams/finished.js | 33 +++++++++++++++++ lib/internal/streams/end-of-stream.js | 14 ++++++-- ...est-stream-finished-async-local-storage.js | 24 +++++++++++++ ...-stream-finished-bindAsyncResource-path.js | 35 +++++++++++++++++++ .../test-stream-finished-default-path.js | 21 +++++++++++ 5 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 benchmark/streams/finished.js create mode 100644 test/parallel/test-stream-finished-async-local-storage.js create mode 100644 test/parallel/test-stream-finished-bindAsyncResource-path.js create mode 100644 test/parallel/test-stream-finished-default-path.js diff --git a/benchmark/streams/finished.js b/benchmark/streams/finished.js new file mode 100644 index 0000000000..5fd2820549 --- /dev/null +++ b/benchmark/streams/finished.js @@ -0,0 +1,33 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); +const { finished } = require('stream/promises'); + +const bench = common.createBenchmark(main, { + n: [1e7], + streamType: ['readable', 'writable'], +}); + +async function main({ n, streamType }) { + bench.start(); + + for (let i = 0; i < n; i++) { + let stream; + + switch (streamType) { + case 'readable': + stream = new Readable({ read() { this.push(null); } }); + stream.resume(); + break; + case 'writable': + stream = new Writable({ write(chunk, enc, cb) { cb(); } }); + stream.end(); + break; + } + + await finished(stream); + } + + bench.end(n); +} diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 850d431413..99587ae254 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -44,6 +44,9 @@ const { kIsClosedPromise, } = require('internal/streams/utils'); +const { getHookArrays } = require('internal/async_hooks'); +const AsyncContextFrame = require('internal/async_context_frame'); + // Lazy load let AsyncResource; let addAbortListener; @@ -74,9 +77,14 @@ function eos(stream, options, callback) { validateFunction(callback, 'callback'); validateAbortSignal(options.signal, 'options.signal'); - // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which - // is a bottleneck here. - callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); + if (AsyncContextFrame.current() || + getHookArrays()[0].length > 0) { + // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which + // is a bottleneck here. + callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); + } else { + callback = once(callback); + } if (isReadableStream(stream) || isWritableStream(stream)) { return eosWeb(stream, options, callback); diff --git a/test/parallel/test-stream-finished-async-local-storage.js b/test/parallel/test-stream-finished-async-local-storage.js new file mode 100644 index 0000000000..1440d2bb0a --- /dev/null +++ b/test/parallel/test-stream-finished-async-local-storage.js @@ -0,0 +1,24 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { AsyncLocalStorage } = require('async_hooks'); +const { strictEqual } = require('assert'); +const AsyncContextFrame = require('internal/async_context_frame'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that ALS context is preserved when using stream.finished() + +const als = new AsyncLocalStorage(); +const readable = new Readable(); + +als.run('test-context-1', () => { + finished(readable, common.mustCall(() => { + strictEqual(AsyncContextFrame.enabled || internalAsyncHooks.getHookArrays()[0].length > 0, + true, 'One of AsyncContextFrame or async hooks criteria should be met'); + strictEqual(als.getStore(), 'test-context-1', 'ALS context should be preserved'); + })); +}); + +readable.destroy(); diff --git a/test/parallel/test-stream-finished-bindAsyncResource-path.js b/test/parallel/test-stream-finished-bindAsyncResource-path.js new file mode 100644 index 0000000000..6bec9587e3 --- /dev/null +++ b/test/parallel/test-stream-finished-bindAsyncResource-path.js @@ -0,0 +1,35 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { createHook, executionAsyncId } = require('async_hooks'); +const { strictEqual } = require('assert'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that when there are active async hooks, stream.finished() uses +// the bindAsyncResource path + +createHook({ + init(asyncId, type, triggerAsyncId) { + if (type === 'STREAM_END_OF_STREAM') { + const parentContext = contextMap.get(triggerAsyncId); + contextMap.set(asyncId, parentContext); + } + } +}).enable(); + +const contextMap = new Map(); +const asyncId = executionAsyncId(); +contextMap.set(asyncId, 'abc-123'); +const readable = new Readable(); + +finished(readable, common.mustCall(() => { + const currentAsyncId = executionAsyncId(); + const ctx = contextMap.get(currentAsyncId); + strictEqual(internalAsyncHooks.getHookArrays()[0].length > 0, + true, 'Should have active user async hook'); + strictEqual(ctx, 'abc-123', 'Context should be preserved'); +})); + +readable.destroy(); diff --git a/test/parallel/test-stream-finished-default-path.js b/test/parallel/test-stream-finished-default-path.js new file mode 100644 index 0000000000..31fcd8d175 --- /dev/null +++ b/test/parallel/test-stream-finished-default-path.js @@ -0,0 +1,21 @@ +// Flags: --expose-internals --no-async-context-frame +'use strict'; + +const common = require('../common'); +const { Readable, finished } = require('stream'); +const { strictEqual } = require('assert'); +const AsyncContextFrame = require('internal/async_context_frame'); +const internalAsyncHooks = require('internal/async_hooks'); + +// This test verifies that when there are no active async hooks, stream.finished() uses the default callback path + +const readable = new Readable(); + +finished(readable, common.mustCall(() => { + strictEqual(internalAsyncHooks.getHookArrays()[0].length === 0, + true, 'Should not have active user async hook'); + strictEqual(AsyncContextFrame.current() || internalAsyncHooks.getHookArrays()[0].length > 0, + false, 'Default callback path should be used'); +})); + +readable.destroy();