[Flight] Wait for both streams to end before closing the response (#34301)

When a debug channel is defined, we must ensure that we don't close the
Flight Client's response when the debug channel's readable is done, but
the RSC stream is still flowing. Now, we wait for both streams to end
before closing the response.
This commit is contained in:
Hendrik Liebau 2025-08-26 17:15:25 +02:00 committed by GitHub
parent bb7c9c1b8a
commit cacc20e37c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 514 additions and 149 deletions

View File

@ -101,6 +101,7 @@ function createResponseFromOptions(options: void | Options) {
function startReadingFromUniversalStream(
response: FlightResponse,
stream: ReadableStream,
onDone: () => void,
): void {
// This is the same as startReadingFromStream except this allows WebSocketStreams which
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@ -116,8 +117,7 @@ function startReadingFromUniversalStream(
...
}): void | Promise<void> {
if (done) {
close(response);
return;
return onDone();
}
if (value instanceof ArrayBuffer) {
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@ -139,7 +139,7 @@ function startReadingFromUniversalStream(
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -152,11 +152,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -178,10 +174,20 @@ function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(response, options.debugChannel.readable);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
}
@ -199,13 +205,24 @@ function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), true);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -63,7 +63,7 @@ export type Options = {
function startReadingFromStream(
response: Response,
stream: Readable,
isSecondaryStream: boolean,
onEnd: () => void,
): void {
const streamState = createStreamState();
@ -79,13 +79,7 @@ function startReadingFromStream(
reportGlobalError(response, error);
});
stream.on('end', () => {
// If we're the secondary stream, then we don't close the response until the
// debug channel closes.
if (!isSecondaryStream) {
close(response);
}
});
stream.on('end', onEnd);
}
function createFromNodeStream<T>(
@ -112,10 +106,16 @@ function createFromNodeStream<T>(
);
if (__DEV__ && options && options.debugChannel) {
startReadingFromStream(response, options.debugChannel, false);
startReadingFromStream(response, stream, true);
let streamEndedCount = 0;
const handleEnd = () => {
if (++streamEndedCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel, handleEnd);
startReadingFromStream(response, stream, handleEnd);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);

View File

@ -102,6 +102,7 @@ function createDebugCallbackFromWritableStream(
function startReadingFromUniversalStream(
response: FlightResponse,
stream: ReadableStream,
onDone: () => void,
): void {
// This is the same as startReadingFromStream except this allows WebSocketStreams which
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@ -117,8 +118,7 @@ function startReadingFromUniversalStream(
...
}): void | Promise<void> {
if (done) {
close(response);
return;
return onDone();
}
if (value instanceof ArrayBuffer) {
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@ -140,7 +140,7 @@ function startReadingFromUniversalStream(
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -153,11 +153,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -208,10 +204,20 @@ export function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(response, options.debugChannel.readable);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
}
@ -250,13 +256,24 @@ export function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), true);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -102,7 +102,7 @@ function createResponseFromOptions(options?: Options) {
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -115,12 +115,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until
// the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -144,10 +139,16 @@ export function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel.readable, handleDone);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
@ -166,10 +167,24 @@ export function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, (r.body: any), true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -59,7 +59,7 @@ export type Options = {
function startReadingFromStream(
response: Response,
stream: Readable,
isSecondaryStream: boolean,
onEnd: () => void,
): void {
const streamState = createStreamState();
@ -75,13 +75,7 @@ function startReadingFromStream(
reportGlobalError(response, error);
});
stream.on('end', () => {
// If we're the secondary stream, then we don't close the response until the
// debug channel closes.
if (!isSecondaryStream) {
close(response);
}
});
stream.on('end', onEnd);
}
export function createFromNodeStream<T>(
@ -104,10 +98,16 @@ export function createFromNodeStream<T>(
);
if (__DEV__ && options && options.debugChannel) {
startReadingFromStream(response, options.debugChannel, false);
startReadingFromStream(response, stream, true);
let streamEndedCount = 0;
const handleEnd = () => {
if (++streamEndedCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel, handleEnd);
startReadingFromStream(response, stream, handleEnd);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);

View File

@ -14,14 +14,20 @@ import {patchMessageChannel} from '../../../../scripts/jest/patchMessageChannel'
// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.WritableStream =
require('web-streams-polyfill/ponyfill/es6').WritableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;
let React;
let ReactDOMClient;
let ReactServerDOMServer;
let ReactServerDOMClient;
let ReactServerScheduler;
let act;
let serverAct;
let turbopackMap;
let use;
describe('ReactFlightTurbopackDOMBrowser', () => {
beforeEach(() => {
@ -36,16 +42,41 @@ describe('ReactFlightTurbopackDOMBrowser', () => {
jest.mock('react-server-dom-turbopack/server', () =>
require('react-server-dom-turbopack/server.browser'),
);
const TurbopackMock = require('./utils/TurbopackMock');
turbopackMap = TurbopackMock.turbopackMap;
ReactServerDOMServer = require('react-server-dom-turbopack/server.browser');
__unmockReact();
jest.resetModules();
({act} = require('internal-test-utils'));
React = require('react');
ReactDOMClient = require('react-dom/client');
ReactServerDOMClient = require('react-server-dom-turbopack/client');
use = React.use;
});
function createDelayedStream(
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
return new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
controller.close();
} else {
// Artificially delay between enqueuing chunks.
await new Promise(resolve => setTimeout(resolve));
controller.enqueue(value);
}
}
},
});
}
it('should resolve HTML using W3C streams', async () => {
function Text({children}) {
return <span>{children}</span>;
@ -80,4 +111,56 @@ describe('ReactFlightTurbopackDOMBrowser', () => {
),
});
});
it('does not close the response early when using a fast debug channel', async () => {
function Component() {
return <div>Hi</div>;
}
let debugReadableStreamController;
const debugReadableStream = new ReadableStream({
start(controller) {
debugReadableStreamController = controller;
},
});
const rscStream = await serverAct(() =>
ReactServerDOMServer.renderToReadableStream(<Component />, turbopackMap, {
debugChannel: {
writable: new WritableStream({
write(chunk) {
debugReadableStreamController.enqueue(chunk);
},
close() {
debugReadableStreamController.close();
},
}),
},
}),
);
function ClientRoot({response}) {
return use(response);
}
const response = ReactServerDOMClient.createFromReadableStream(
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `Connection closed` error in the Flight client.
createDelayedStream(rscStream),
{
debugChannel: {readable: debugReadableStream},
},
);
const container = document.createElement('div');
const root = ReactDOMClient.createRoot(container);
await act(() => {
root.render(<ClientRoot response={response} />);
});
expect(container.innerHTML).toBe('<div>Hi</div>');
});
});

View File

@ -78,6 +78,26 @@ describe('ReactFlightTurbopackDOMEdge', () => {
);
}
function createDelayedStream(
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
return new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
controller.close();
} else {
// Artificially delay between enqueuing chunks.
await new Promise(resolve => setTimeout(resolve));
controller.enqueue(value);
}
}
},
});
}
it('should allow an alternative module mapping to be used for SSR', async () => {
function ClientComponent() {
return <span>Client Component</span>;
@ -165,6 +185,9 @@ describe('ReactFlightTurbopackDOMEdge', () => {
write(chunk) {
debugReadableStreamController.enqueue(chunk);
},
close() {
debugReadableStreamController.close();
},
}),
},
},
@ -184,10 +207,16 @@ describe('ReactFlightTurbopackDOMEdge', () => {
moduleLoading: null,
};
const response = ReactServerDOMClient.createFromReadableStream(rscStream, {
serverConsumerManifest,
debugChannel: {readable: debugReadableStream},
});
const response = ReactServerDOMClient.createFromReadableStream(
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `Connection closed` error in the Flight client.
createDelayedStream(rscStream),
{
serverConsumerManifest,
debugChannel: {readable: debugReadableStream},
},
);
let ownerStack;

View File

@ -90,6 +90,18 @@ describe('ReactFlightTurbopackDOMNode', () => {
);
}
function createDelayedStream() {
return new Stream.Transform({
...streamOptions,
transform(chunk, encoding, callback) {
setTimeout(() => {
this.push(chunk);
callback();
});
},
});
}
it('should allow an alternative module mapping to be used for SSR', async () => {
function ClientComponent() {
return <span>Client Component</span>;
@ -180,12 +192,18 @@ describe('ReactFlightTurbopackDOMNode', () => {
debugReadable.write(chunk, encoding);
callback();
},
final() {
debugReadable.end();
},
}),
},
),
);
const readable = new Stream.PassThrough(streamOptions);
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `controller.enqueueModel is not a function` error in the Flight client.
const readable = createDelayedStream();
rscStream.pipe(readable);

View File

@ -100,6 +100,7 @@ function createResponseFromOptions(options: void | Options) {
function startReadingFromUniversalStream(
response: FlightResponse,
stream: ReadableStream,
onDone: () => void,
): void {
// This is the same as startReadingFromStream except this allows WebSocketStreams which
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@ -115,8 +116,7 @@ function startReadingFromUniversalStream(
...
}): void | Promise<void> {
if (done) {
close(response);
return;
return onDone();
}
if (value instanceof ArrayBuffer) {
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@ -138,7 +138,7 @@ function startReadingFromUniversalStream(
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -151,11 +151,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -178,10 +174,20 @@ function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(response, options.debugChannel.readable);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
}
@ -199,13 +205,24 @@ function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), true);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -106,7 +106,7 @@ function createResponseFromOptions(options: Options) {
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -119,12 +119,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until
// the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -148,10 +143,16 @@ function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel.readable, handleDone);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
@ -170,10 +171,24 @@ function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, (r.body: any), true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -66,7 +66,7 @@ export type Options = {
function startReadingFromStream(
response: Response,
stream: Readable,
isSecondaryStream: boolean,
onEnd: () => void,
): void {
const streamState = createStreamState();
@ -82,13 +82,7 @@ function startReadingFromStream(
reportGlobalError(response, error);
});
stream.on('end', () => {
// If we're the secondary stream, then we don't close the response until the
// debug channel closes.
if (!isSecondaryStream) {
close(response);
}
});
stream.on('end', onEnd);
}
function createFromNodeStream<T>(
@ -114,10 +108,16 @@ function createFromNodeStream<T>(
);
if (__DEV__ && options && options.debugChannel) {
startReadingFromStream(response, options.debugChannel, false);
startReadingFromStream(response, stream, true);
let streamEndedCount = 0;
const handleEnd = () => {
if (++streamEndedCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel, handleEnd);
startReadingFromStream(response, stream, handleEnd);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);

View File

@ -12,6 +12,8 @@
// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.WritableStream =
require('web-streams-polyfill/ponyfill/es6').WritableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;
@ -152,6 +154,26 @@ describe('ReactFlightDOMBrowser', () => {
return fn.apply(null, args);
}
function createDelayedStream(
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
return new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
controller.close();
} else {
// Artificially delay between enqueuing chunks.
await new Promise(resolve => setTimeout(resolve));
controller.enqueue(value);
}
}
},
});
}
it('should resolve HTML using W3C streams', async () => {
function Text({children}) {
return <span>{children}</span>;
@ -2693,4 +2715,56 @@ describe('ReactFlightDOMBrowser', () => {
expect(container.innerHTML).toBe('<div></div>');
});
it('does not close the response early when using a fast debug channel', async () => {
function Component() {
return <div>Hi</div>;
}
let debugReadableStreamController;
const debugReadableStream = new ReadableStream({
start(controller) {
debugReadableStreamController = controller;
},
});
const rscStream = await serverAct(() =>
ReactServerDOMServer.renderToReadableStream(<Component />, webpackMap, {
debugChannel: {
writable: new WritableStream({
write(chunk) {
debugReadableStreamController.enqueue(chunk);
},
close() {
debugReadableStreamController.close();
},
}),
},
}),
);
function ClientRoot({response}) {
return use(response);
}
const response = ReactServerDOMClient.createFromReadableStream(
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `Connection closed` error in the Flight client.
createDelayedStream(rscStream),
{
debugChannel: {readable: debugReadableStream},
},
);
const container = document.createElement('div');
const root = ReactDOMClient.createRoot(container);
await act(() => {
root.render(<ClientRoot response={response} />);
});
expect(container.innerHTML).toBe('<div>Hi</div>');
});
});

View File

@ -233,10 +233,10 @@ describe('ReactFlightDOMEdge', () => {
}
async function createBufferedUnclosingStream(
prelude: ReadableStream<Uint8Array>,
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
const chunks: Array<Uint8Array> = [];
const reader = prelude.getReader();
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
@ -256,6 +256,26 @@ describe('ReactFlightDOMEdge', () => {
});
}
function createDelayedStream(
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
return new ReadableStream({
async start(controller) {
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
controller.close();
} else {
// Artificially delay between enqueuing chunks.
await new Promise(resolve => setTimeout(resolve));
controller.enqueue(value);
}
}
},
});
}
it('should allow an alternative module mapping to be used for SSR', async () => {
function ClientComponent() {
return <span>Client Component</span>;
@ -2012,6 +2032,9 @@ describe('ReactFlightDOMEdge', () => {
write(chunk) {
debugReadableStreamController.enqueue(chunk);
},
close() {
debugReadableStreamController.close();
},
}),
},
},
@ -2032,10 +2055,16 @@ describe('ReactFlightDOMEdge', () => {
moduleLoading: webpackModuleLoading,
};
const response = ReactServerDOMClient.createFromReadableStream(rscStream, {
serverConsumerManifest,
debugChannel: {readable: debugReadableStream},
});
const response = ReactServerDOMClient.createFromReadableStream(
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `Connection closed` error in the Flight client.
createDelayedStream(rscStream),
{
serverConsumerManifest,
debugChannel: {readable: debugReadableStream},
},
);
let ownerStack;

View File

@ -128,10 +128,10 @@ describe('ReactFlightDOMNode', () => {
}
async function createBufferedUnclosingStream(
prelude: ReadableStream<Uint8Array>,
stream: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
const chunks: Array<Uint8Array> = [];
const reader = prelude.getReader();
const reader = stream.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
@ -151,6 +151,19 @@ describe('ReactFlightDOMNode', () => {
});
}
function createDelayedStream() {
return new Stream.Transform({
...streamOptions,
transform(chunk, encoding, callback) {
// Artificially delay between pushing chunks.
setTimeout(() => {
this.push(chunk);
callback();
});
},
});
}
it('should support web streams in node', async () => {
function Text({children}) {
return <span>{children}</span>;
@ -940,12 +953,18 @@ describe('ReactFlightDOMNode', () => {
debugReadable.write(chunk, encoding);
callback();
},
final() {
debugReadable.end();
},
}),
},
),
);
const readable = new Stream.PassThrough(streamOptions);
// Create a delayed stream to simulate that the RSC stream might be
// transported slower than the debug channel, which must not lead to a
// `controller.enqueueModel is not a function` error in the Flight client.
const readable = createDelayedStream();
rscStream.pipe(readable);

View File

@ -100,6 +100,7 @@ function createResponseFromOptions(options: void | Options) {
function startReadingFromUniversalStream(
response: FlightResponse,
stream: ReadableStream,
onDone: () => void,
): void {
// This is the same as startReadingFromStream except this allows WebSocketStreams which
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
@ -115,8 +116,7 @@ function startReadingFromUniversalStream(
...
}): void | Promise<void> {
if (done) {
close(response);
return;
return onDone();
}
if (value instanceof ArrayBuffer) {
// WebSockets can produce ArrayBuffer values in ReadableStreams.
@ -138,7 +138,7 @@ function startReadingFromUniversalStream(
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -151,11 +151,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -178,10 +174,20 @@ function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromUniversalStream(response, options.debugChannel.readable);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
}
@ -199,13 +205,24 @@ function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromUniversalStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), true);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -106,7 +106,7 @@ function createResponseFromOptions(options: Options) {
function startReadingFromStream(
response: FlightResponse,
stream: ReadableStream,
isSecondaryStream: boolean,
onDone: () => void,
): void {
const streamState = createStreamState();
const reader = stream.getReader();
@ -119,12 +119,7 @@ function startReadingFromStream(
...
}): void | Promise<void> {
if (done) {
// If we're the secondary stream, then we don't close the response until
// the debug channel closes.
if (!isSecondaryStream) {
close(response);
}
return;
return onDone();
}
const buffer: Uint8Array = (value: any);
processBinaryChunk(response, streamState, buffer);
@ -148,10 +143,16 @@ function createFromReadableStream<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, stream, true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel.readable, handleDone);
startReadingFromStream(response, stream, handleDone);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);
@ -170,10 +171,24 @@ function createFromFetch<T>(
options.debugChannel &&
options.debugChannel.readable
) {
startReadingFromStream(response, options.debugChannel.readable, false);
startReadingFromStream(response, (r.body: any), true);
let streamDoneCount = 0;
const handleDone = () => {
if (++streamDoneCount === 2) {
close(response);
}
};
startReadingFromStream(
response,
options.debugChannel.readable,
handleDone,
);
startReadingFromStream(response, (r.body: any), handleDone);
} else {
startReadingFromStream(response, (r.body: any), false);
startReadingFromStream(
response,
(r.body: any),
close.bind(null, response),
);
}
},
function (e) {

View File

@ -66,7 +66,7 @@ export type Options = {
function startReadingFromStream(
response: Response,
stream: Readable,
isSecondaryStream: boolean,
onEnd: () => void,
): void {
const streamState = createStreamState();
@ -82,13 +82,7 @@ function startReadingFromStream(
reportGlobalError(response, error);
});
stream.on('end', () => {
// If we're the secondary stream, then we don't close the response until the
// debug channel closes.
if (!isSecondaryStream) {
close(response);
}
});
stream.on('end', onEnd);
}
function createFromNodeStream<T>(
@ -114,10 +108,16 @@ function createFromNodeStream<T>(
);
if (__DEV__ && options && options.debugChannel) {
startReadingFromStream(response, options.debugChannel, false);
startReadingFromStream(response, stream, true);
let streamEndedCount = 0;
const handleEnd = () => {
if (++streamEndedCount === 2) {
close(response);
}
};
startReadingFromStream(response, options.debugChannel, handleEnd);
startReadingFromStream(response, stream, handleEnd);
} else {
startReadingFromStream(response, stream, false);
startReadingFromStream(response, stream, close.bind(null, response));
}
return getRoot(response);