mirror of
https://github.com/zebrajr/node.git
synced 2025-12-07 12:20:50 +01:00
stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)
Fix the uncommon situation when a readable stream is piped twice into the same destination stream, and then unpiped once. Previously, the `unpipe` event handlers weren’t able to tell whether they were corresponding to the “right” conceptual pipe that was being removed; this fixes this by adding a counter to the `unpipe` event handler and only removing a single piping destination at most. Fixes: https://github.com/nodejs/node/issues/12718 PR-URL: https://github.com/nodejs/node/pull/12746 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
0ca2dad3a6
commit
8ab8d6afd6
|
|
@ -499,10 +499,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
|||
src.once('end', endFn);
|
||||
|
||||
dest.on('unpipe', onunpipe);
|
||||
function onunpipe(readable) {
|
||||
function onunpipe(readable, unpipeInfo) {
|
||||
debug('onunpipe');
|
||||
if (readable === src) {
|
||||
cleanup();
|
||||
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
|
||||
unpipeInfo.hasUnpiped = true;
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -628,6 +631,7 @@ function pipeOnDrain(src) {
|
|||
|
||||
Readable.prototype.unpipe = function(dest) {
|
||||
var state = this._readableState;
|
||||
var unpipeInfo = { hasUnpiped: false };
|
||||
|
||||
// if we're not piping anywhere, then do nothing.
|
||||
if (state.pipesCount === 0)
|
||||
|
|
@ -647,7 +651,7 @@ Readable.prototype.unpipe = function(dest) {
|
|||
state.pipesCount = 0;
|
||||
state.flowing = false;
|
||||
if (dest)
|
||||
dest.emit('unpipe', this);
|
||||
dest.emit('unpipe', this, unpipeInfo);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -662,7 +666,7 @@ Readable.prototype.unpipe = function(dest) {
|
|||
state.flowing = false;
|
||||
|
||||
for (var i = 0; i < len; i++)
|
||||
dests[i].emit('unpipe', this);
|
||||
dests[i].emit('unpipe', this, unpipeInfo);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -676,7 +680,7 @@ Readable.prototype.unpipe = function(dest) {
|
|||
if (state.pipesCount === 1)
|
||||
state.pipes = state.pipes[0];
|
||||
|
||||
dest.emit('unpipe', this);
|
||||
dest.emit('unpipe', this, unpipeInfo);
|
||||
|
||||
return this;
|
||||
};
|
||||
|
|
|
|||
78
test/parallel/test-stream-pipe-same-destination-twice.js
Normal file
78
test/parallel/test-stream-pipe-same-destination-twice.js
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
'use strict';
|
||||
const common = require('../common');
|
||||
|
||||
// Regression test for https://github.com/nodejs/node/issues/12718.
|
||||
// Tests that piping a source stream twice to the same destination stream
|
||||
// works, and that a subsequent unpipe() call only removes the pipe *once*.
|
||||
const assert = require('assert');
|
||||
const { PassThrough, Writable } = require('stream');
|
||||
|
||||
{
|
||||
const passThrough = new PassThrough();
|
||||
const dest = new Writable({
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert.strictEqual(`${chunk}`, 'foobar');
|
||||
cb();
|
||||
})
|
||||
});
|
||||
|
||||
passThrough.pipe(dest);
|
||||
passThrough.pipe(dest);
|
||||
|
||||
assert.strictEqual(passThrough._events.data.length, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipesCount, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipes[0], dest);
|
||||
assert.strictEqual(passThrough._readableState.pipes[1], dest);
|
||||
|
||||
passThrough.unpipe(dest);
|
||||
|
||||
assert.strictEqual(passThrough._events.data.length, 1);
|
||||
assert.strictEqual(passThrough._readableState.pipesCount, 1);
|
||||
assert.strictEqual(passThrough._readableState.pipes, dest);
|
||||
|
||||
passThrough.write('foobar');
|
||||
passThrough.pipe(dest);
|
||||
}
|
||||
|
||||
{
|
||||
const passThrough = new PassThrough();
|
||||
const dest = new Writable({
|
||||
write: common.mustCall((chunk, encoding, cb) => {
|
||||
assert.strictEqual(`${chunk}`, 'foobar');
|
||||
cb();
|
||||
}, 2)
|
||||
});
|
||||
|
||||
passThrough.pipe(dest);
|
||||
passThrough.pipe(dest);
|
||||
|
||||
assert.strictEqual(passThrough._events.data.length, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipesCount, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipes[0], dest);
|
||||
assert.strictEqual(passThrough._readableState.pipes[1], dest);
|
||||
|
||||
passThrough.write('foobar');
|
||||
}
|
||||
|
||||
{
|
||||
const passThrough = new PassThrough();
|
||||
const dest = new Writable({
|
||||
write: common.mustNotCall()
|
||||
});
|
||||
|
||||
passThrough.pipe(dest);
|
||||
passThrough.pipe(dest);
|
||||
|
||||
assert.strictEqual(passThrough._events.data.length, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipesCount, 2);
|
||||
assert.strictEqual(passThrough._readableState.pipes[0], dest);
|
||||
assert.strictEqual(passThrough._readableState.pipes[1], dest);
|
||||
|
||||
passThrough.unpipe(dest);
|
||||
passThrough.unpipe(dest);
|
||||
|
||||
assert.strictEqual(passThrough._events.data, undefined);
|
||||
assert.strictEqual(passThrough._readableState.pipesCount, 0);
|
||||
|
||||
passThrough.write('foobar');
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user