stream: bypass legacy destroy for pipeline and async iteration

PR-URL: https://github.com/nodejs/node/pull/38505
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy 2021-05-02 18:17:18 +02:00 committed by Daniele Belardi
parent c0becbc1bd
commit f4609bdf3f
No known key found for this signature in database
GPG Key ID: 91C67200C945858D
6 changed files with 188 additions and 12 deletions

View File

@ -53,6 +53,7 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
res[kDestroy] = null;
// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);

View File

@ -31,6 +31,7 @@ const {
} = primordials;
const { Readable, finished } = require('stream');
const { kDestroy } = require('internal/streams/destroy');
const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
cleanup();
onError(this, e || err, cb);
process.nextTick(onError, this, e || err, cb);
});
} else {
onError(this, err, cb);
process.nextTick(onError, this, err, cb);
}
};
IncomingMessage.prototype[kDestroy] = function(err) {
this.socket = null;
this.destroy(err);
};
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {

View File

@ -231,9 +231,7 @@ function onServerResponseClose() {
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
if (this._httpMessage) {
this._httpMessage.destroyed = true;
this._httpMessage._closed = true;
this._httpMessage.emit('close');
emitCloseNT(this._httpMessage);
}
}
@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
}
function emitCloseNT(self) {
self.destroyed = true;
self._closed = true;
self.emit('close');
if (!self.destroyed) {
self.destroyed = true;
self._closed = true;
self.emit('close');
}
}
// The following callback is issued after the headers have been read on a

View File

@ -5,6 +5,7 @@ const {
codes: {
ERR_MULTIPLE_CALLBACK,
},
AbortError,
} = require('internal/errors');
const {
Symbol,
@ -363,15 +364,65 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}
const kDestroyed = Symbol('kDestroyed');
function emitCloseLegacy(stream) {
stream.emit('close');
}
function emitErrorCloseLegacy(stream, err) {
stream.emit('error', err);
process.nextTick(emitCloseLegacy, stream);
}
function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}
function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}
function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}
// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
if (isDestroyed(stream)) {
return;
}
if (!err && (isReadable(stream) || isWritable(stream))) {
err = new AbortError();
}
// TODO: Remove isRequest branches.
if (typeof stream[kDestroy] === 'function') {
stream[kDestroy](err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
stream.req.abort();
} else if (typeof stream.destroy === 'function') {
stream.destroy(err);
} else if (typeof stream.close === 'function') {
// TODO: Don't lose err?
stream.close();
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream);
} else {
process.nextTick(emitCloseLegacy, stream);
}
if (!stream.destroyed) {
stream[kDestroyed] = true;
}
}
module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,

View File

@ -30,6 +30,7 @@ const {
} = require('internal/util');
const pipeline = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;
ObjectDefineProperty(Stream, 'promises', {
configurable: true,

View File

@ -0,0 +1,115 @@
'use strict';
const common = require('../common');
const {
Writable,
Readable,
destroy
} = require('stream');
const assert = require('assert');
const http = require('http');
{
const r = new Readable({ read() {} });
destroy(r);
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
r.on('close', common.mustCall());
}
{
const r = new Readable({ read() {} });
destroy(r, new Error('asd'));
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
r.on('close', common.mustCall());
}
{
const w = new Writable({ write() {} });
destroy(w);
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
w.on('close', common.mustCall());
}
{
const w = new Writable({ write() {} });
destroy(w, new Error('asd'));
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
w.on('close', common.mustCall());
}
{
const server = http.createServer((req, res) => {
destroy(req);
req.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
req.on('close', common.mustCall(() => {
res.end('hello');
}));
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}
{
const server = http.createServer((req, res) => {
req
.resume()
.on('end', () => {
destroy(req);
})
.on('error', common.mustNotCall());
req.on('close', common.mustCall(() => {
res.end('hello');
}));
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}