stream: make Readable.from performance better

PR-URL: https://github.com/nodejs/node/pull/37609
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
wwwzbwcom 2021-03-05 19:55:00 +08:00 committed by Ruy Adorno
parent f07428ae51
commit 43c3b43ea3
No known key found for this signature in database
GPG Key ID: 97B01419BD92F80A
2 changed files with 58 additions and 19 deletions

View File

@ -0,0 +1,26 @@
'use strict';
const common = require('../common');
const Readable = require('stream').Readable;
const bench = common.createBenchmark(main, {
n: [1e7],
});
async function main({ n }) {
const arr = [];
for (let i = 0; i < n; i++) {
arr.push(`${i}`);
}
const s = new Readable.from(arr);
bench.start();
s.on('data', (data) => {
// eslint-disable-next-line no-unused-expressions
data;
});
s.on('close', () => {
bench.end(n);
});
}

View File

@ -3,7 +3,7 @@
const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
} = primordials;
const { Buffer } = require('buffer');
@ -25,18 +25,22 @@ function from(Readable, iterable, opts) {
});
}
if (iterable && iterable[SymbolAsyncIterator])
let isAsync = false;
if (iterable && iterable[SymbolAsyncIterator]) {
isAsync = true;
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
} else if (iterable && iterable[SymbolIterator]) {
isAsync = false;
iterator = iterable[SymbolIterator]();
else
} else {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}
const readable = new Readable({
objectMode: true,
highWaterMark: 1,
// TODO(ronag): What options should be allowed?
...opts
...opts,
});
// Flag to protect against _read
@ -75,23 +79,32 @@ function from(Readable, iterable, opts) {
}
async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else {
const res = await value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
next();
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();
if (done) {
readable.push(null);
} else {
reading = false;
const res = (value &&
typeof value.then === 'function') ?
await value :
value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
} catch (err) {
readable.destroy(err);
break;
}
}
return readable;