stream: reduce overhead of transfer

PR-URL: https://github.com/nodejs/node/pull/50107
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
This commit is contained in:
Vinicius Lourenço 2023-10-12 11:37:41 -03:00 committed by GitHub
parent 760b5dd259
commit a85e4186e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 85 deletions

View File

@ -0,0 +1,52 @@
'use strict';
const common = require('../common.js');
const { MessageChannel } = require('worker_threads');
const { WritableStream, TransformStream, ReadableStream } = require('stream/web');
const bench = common.createBenchmark(main, {
payload: ['WritableStream', 'ReadableStream', 'TransformStream'],
n: [1e4],
});
function main({ n, payload: payloadType }) {
let createPayload;
let messages = 0;
switch (payloadType) {
case 'WritableStream':
createPayload = () => new WritableStream();
break;
case 'ReadableStream':
createPayload = () => new ReadableStream();
break;
case 'TransformStream':
createPayload = () => new TransformStream();
break;
default:
throw new Error('Unsupported payload type');
}
const { port1, port2 } = new MessageChannel();
port2.onmessage = onMessage;
function onMessage() {
if (messages++ === n) {
bench.end(n);
port1.close();
} else {
send();
}
}
function send() {
const stream = createPayload();
port1.postMessage(stream, [stream]);
}
bench.start();
send();
}

View File

@ -17,7 +17,6 @@ const {
PromisePrototypeThen, PromisePrototypeThen,
PromiseResolve, PromiseResolve,
PromiseReject, PromiseReject,
ReflectConstruct,
SafePromiseAll, SafePromiseAll,
Symbol, Symbol,
SymbolAsyncIterator, SymbolAsyncIterator,
@ -642,26 +641,37 @@ ObjectDefineProperties(ReadableStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name), [SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStream.name),
}); });
function TransferredReadableStream() { function InternalTransferredReadableStream() {
return ReflectConstruct( markTransferMode(this, false, true);
function() { this[kType] = 'ReadableStream';
markTransferMode(this, false, true); this[kState] = {
this[kType] = 'ReadableStream'; disturbed: false,
this[kState] = { reader: undefined,
disturbed: false, state: 'readable',
state: 'readable', storedError: undefined,
storedError: undefined, stream: undefined,
stream: undefined, transfer: {
transfer: { writable: undefined,
writable: undefined, port1: undefined,
port: undefined, port2: undefined,
promise: undefined, promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
}, },
[], ReadableStream); };
this[kIsClosedPromise] = createDeferredPromise();
} }
ObjectSetPrototypeOf(InternalTransferredReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalTransferredReadableStream, ReadableStream);
function TransferredReadableStream() {
const stream = new InternalTransferredReadableStream();
stream.constructor = ReadableStream;
return stream;
}
TransferredReadableStream.prototype[kDeserialize] = () => {}; TransferredReadableStream.prototype[kDeserialize] = () => {};
class ReadableStreamBYOBRequest { class ReadableStreamBYOBRequest {

View File

@ -4,9 +4,9 @@ const {
FunctionPrototypeBind, FunctionPrototypeBind,
FunctionPrototypeCall, FunctionPrototypeCall,
ObjectDefineProperties, ObjectDefineProperties,
ObjectSetPrototypeOf,
PromisePrototypeThen, PromisePrototypeThen,
PromiseResolve, PromiseResolve,
ReflectConstruct,
SymbolToStringTag, SymbolToStringTag,
Symbol, Symbol,
} = primordials; } = primordials;
@ -247,25 +247,33 @@ ObjectDefineProperties(TransformStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name), [SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStream.name),
}); });
function TransferredTransformStream() { function InternalTransferredTransformStream() {
return ReflectConstruct( markTransferMode(this, false, true);
function() { this[kType] = 'TransformStream';
markTransferMode(this, false, true); this[kState] = {
this[kType] = 'TransformStream'; readable: undefined,
this[kState] = { writable: undefined,
readable: undefined, backpressure: undefined,
writable: undefined, backpressureChange: {
backpressure: undefined, promise: undefined,
backpressureChange: { resolve: undefined,
promise: undefined, reject: undefined,
resolve: undefined,
reject: undefined,
},
controller: undefined,
};
}, },
[], TransformStream); controller: undefined,
};
} }
ObjectSetPrototypeOf(InternalTransferredTransformStream.prototype, TransformStream.prototype);
ObjectSetPrototypeOf(InternalTransferredTransformStream, TransformStream);
function TransferredTransformStream() {
const stream = new InternalTransferredTransformStream();
stream.constructor = TransformStream;
return stream;
}
TransferredTransformStream.prototype[kDeserialize] = () => {}; TransferredTransformStream.prototype[kDeserialize] = () => {};
class TransformStreamDefaultController { class TransformStreamDefaultController {

View File

@ -6,10 +6,10 @@ const {
FunctionPrototypeBind, FunctionPrototypeBind,
FunctionPrototypeCall, FunctionPrototypeCall,
ObjectDefineProperties, ObjectDefineProperties,
ObjectSetPrototypeOf,
PromisePrototypeThen, PromisePrototypeThen,
PromiseResolve, PromiseResolve,
PromiseReject, PromiseReject,
ReflectConstruct,
Symbol, Symbol,
SymbolToStringTag, SymbolToStringTag,
} = primordials; } = primordials;
@ -326,55 +326,63 @@ ObjectDefineProperties(WritableStream.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name), [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name),
}); });
function TransferredWritableStream() { function InternalTransferredWritableStream() {
return ReflectConstruct( markTransferMode(this, false, true);
function() { this[kType] = 'WritableStream';
markTransferMode(this, false, true); this[kState] = {
this[kType] = 'WritableStream'; close: createDeferredPromise(),
this[kState] = { closeRequest: {
close: createDeferredPromise(), promise: undefined,
closeRequest: { resolve: undefined,
promise: undefined, reject: undefined,
resolve: undefined,
reject: undefined,
},
inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
promise: undefined,
port1: undefined,
port2: undefined,
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
}, },
[], WritableStream); inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
readable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
} }
ObjectSetPrototypeOf(InternalTransferredWritableStream.prototype, WritableStream.prototype);
ObjectSetPrototypeOf(InternalTransferredWritableStream, WritableStream);
function TransferredWritableStream() {
const stream = new InternalTransferredWritableStream();
stream.constructor = WritableStream;
return stream;
}
TransferredWritableStream.prototype[kDeserialize] = () => {}; TransferredWritableStream.prototype[kDeserialize] = () => {};
class WritableStreamDefaultWriter { class WritableStreamDefaultWriter {