[Fizz] Add Web Streams to Fizz Node entry point (#33475)

New take on #33441.

This uses a wrapper instead of a separate bundle.
This commit is contained in:
Sebastian Markbåge 2025-06-06 20:16:43 -04:00 committed by GitHub
parent b3d5e90786
commit 65ec57df37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 503 additions and 9 deletions

View File

@ -13,6 +13,10 @@ exports.version = l.version;
exports.renderToString = l.renderToString; exports.renderToString = l.renderToString;
exports.renderToStaticMarkup = l.renderToStaticMarkup; exports.renderToStaticMarkup = l.renderToStaticMarkup;
exports.renderToPipeableStream = s.renderToPipeableStream; exports.renderToPipeableStream = s.renderToPipeableStream;
exports.renderToReadableStream = s.renderToReadableStream;
if (s.resumeToPipeableStream) { if (s.resumeToPipeableStream) {
exports.resumeToPipeableStream = s.resumeToPipeableStream; exports.resumeToPipeableStream = s.resumeToPipeableStream;
} }
if (s.resume) {
exports.resume = s.resume;
}

View File

@ -9,4 +9,6 @@ if (process.env.NODE_ENV === 'production') {
exports.version = s.version; exports.version = s.version;
exports.prerenderToNodeStream = s.prerenderToNodeStream; exports.prerenderToNodeStream = s.prerenderToNodeStream;
exports.prerender = s.prerender;
exports.resumeAndPrerenderToNodeStream = s.resumeAndPrerenderToNodeStream; exports.resumeAndPrerenderToNodeStream = s.resumeAndPrerenderToNodeStream;
exports.resumeAndPrerender = s.resumeAndPrerender;

View File

@ -37,3 +37,17 @@ export function resumeToPipeableStream() {
arguments, arguments,
); );
} }
export function renderToReadableStream() {
return require('./src/server/react-dom-server.node').renderToReadableStream.apply(
this,
arguments,
);
}
export function resume() {
return require('./src/server/react-dom-server.node').resume.apply(
this,
arguments,
);
}

View File

@ -56,6 +56,18 @@ describe('ReactDOMFizzServerNode', () => {
throw theInfinitePromise; throw theInfinitePromise;
} }
async function readContentWeb(stream) {
const reader = stream.getReader();
let content = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return content;
}
content += Buffer.from(value).toString('utf8');
}
}
it('should call renderToPipeableStream', async () => { it('should call renderToPipeableStream', async () => {
const {writable, output} = getTestWritable(); const {writable, output} = getTestWritable();
await act(() => { await act(() => {
@ -67,6 +79,14 @@ describe('ReactDOMFizzServerNode', () => {
expect(output.result).toMatchInlineSnapshot(`"<div>hello world</div>"`); expect(output.result).toMatchInlineSnapshot(`"<div>hello world</div>"`);
}); });
it('should support web streams', async () => {
const stream = await act(() =>
ReactDOMFizzServer.renderToReadableStream(<div>hello world</div>),
);
const result = await readContentWeb(stream);
expect(result).toMatchInlineSnapshot(`"<div>hello world</div>"`);
});
it('flush fully if piping in on onShellReady', async () => { it('flush fully if piping in on onShellReady', async () => {
const {writable, output} = getTestWritable(); const {writable, output} = getTestWritable();
await act(() => { await act(() => {

View File

@ -46,6 +46,18 @@ describe('ReactDOMFizzStaticNode', () => {
}); });
} }
async function readContentWeb(stream) {
const reader = stream.getReader();
let content = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return content;
}
content += Buffer.from(value).toString('utf8');
}
}
// @gate experimental // @gate experimental
it('should call prerenderToNodeStream', async () => { it('should call prerenderToNodeStream', async () => {
const result = await ReactDOMFizzStatic.prerenderToNodeStream( const result = await ReactDOMFizzStatic.prerenderToNodeStream(
@ -55,6 +67,13 @@ describe('ReactDOMFizzStaticNode', () => {
expect(prelude).toMatchInlineSnapshot(`"<div>hello world</div>"`); expect(prelude).toMatchInlineSnapshot(`"<div>hello world</div>"`);
}); });
// @gate experimental
it('should suppport web streams', async () => {
const result = await ReactDOMFizzStatic.prerender(<div>hello world</div>);
const prelude = await readContentWeb(result.prelude);
expect(prelude).toMatchInlineSnapshot(`"<div>hello world</div>"`);
});
// @gate experimental // @gate experimental
it('should emit DOCTYPE at the root of the document', async () => { it('should emit DOCTYPE at the root of the document', async () => {
const result = await ReactDOMFizzStatic.prerenderToNodeStream( const result = await ReactDOMFizzStatic.prerenderToNodeStream(

View File

@ -41,6 +41,8 @@ import {
createRootFormatContext, createRootFormatContext,
} from 'react-dom-bindings/src/server/ReactFizzConfigDOM'; } from 'react-dom-bindings/src/server/ReactFizzConfigDOM';
import {textEncoder} from 'react-server/src/ReactServerStreamConfigNode';
import {ensureCorrectIsomorphicReactVersion} from '../shared/ensureCorrectIsomorphicReactVersion'; import {ensureCorrectIsomorphicReactVersion} from '../shared/ensureCorrectIsomorphicReactVersion';
ensureCorrectIsomorphicReactVersion(); ensureCorrectIsomorphicReactVersion();
@ -167,6 +169,141 @@ function renderToPipeableStream(
}; };
} }
function createFakeWritableFromReadableStreamController(
controller: ReadableStreamController,
): Writable {
// The current host config expects a Writable so we create
// a fake writable for now to push into the Readable.
return ({
write(chunk: string | Uint8Array) {
if (typeof chunk === 'string') {
chunk = textEncoder.encode(chunk);
}
controller.enqueue(chunk);
// in web streams there is no backpressure so we can alwas write more
return true;
},
end() {
controller.close();
},
destroy(error) {
// $FlowFixMe[method-unbinding]
if (typeof controller.error === 'function') {
// $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
controller.error(error);
} else {
controller.close();
}
},
}: any);
}
// TODO: Move to sub-classing ReadableStream.
type ReactDOMServerReadableStream = ReadableStream & {
allReady: Promise<void>,
};
type WebStreamsOptions = Omit<
Options,
'onShellReady' | 'onShellError' | 'onAllReady' | 'onHeaders',
> & {signal: AbortSignal, onHeaders?: (headers: Headers) => void};
function renderToReadableStream(
children: ReactNodeList,
options?: WebStreamsOptions,
): Promise<ReactDOMServerReadableStream> {
return new Promise((resolve, reject) => {
let onFatalError;
let onAllReady;
const allReady = new Promise<void>((res, rej) => {
onAllReady = res;
onFatalError = rej;
});
function onShellReady() {
let writable: Writable;
const stream: ReactDOMServerReadableStream = (new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
): any);
// TODO: Move to sub-classing ReadableStream.
stream.allReady = allReady;
resolve(stream);
}
function onShellError(error: mixed) {
// If the shell errors the caller of `renderToReadableStream` won't have access to `allReady`.
// However, `allReady` will be rejected by `onFatalError` as well.
// So we need to catch the duplicate, uncatchable fatal error in `allReady` to prevent a `UnhandledPromiseRejection`.
allReady.catch(() => {});
reject(error);
}
const onHeaders = options ? options.onHeaders : undefined;
let onHeadersImpl;
if (onHeaders) {
onHeadersImpl = (headersDescriptor: HeadersDescriptor) => {
onHeaders(new Headers(headersDescriptor));
};
}
const resumableState = createResumableState(
options ? options.identifierPrefix : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.bootstrapScriptContent : undefined,
options ? options.bootstrapScripts : undefined,
options ? options.bootstrapModules : undefined,
);
const request = createRequest(
children,
resumableState,
createRenderState(
resumableState,
options ? options.nonce : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.importMap : undefined,
onHeadersImpl,
options ? options.maxHeadersLength : undefined,
),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
onAllReady,
onShellReady,
onShellError,
onFatalError,
options ? options.onPostpone : undefined,
options ? options.formState : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
function resumeRequestImpl( function resumeRequestImpl(
children: ReactNodeList, children: ReactNodeList,
postponedState: PostponedState, postponedState: PostponedState,
@ -225,8 +362,89 @@ function resumeToPipeableStream(
}; };
} }
type WebStreamsResumeOptions = Omit<
Options,
'onShellReady' | 'onShellError' | 'onAllReady',
> & {signal: AbortSignal};
function resume(
children: ReactNodeList,
postponedState: PostponedState,
options?: WebStreamsResumeOptions,
): Promise<ReactDOMServerReadableStream> {
return new Promise((resolve, reject) => {
let onFatalError;
let onAllReady;
const allReady = new Promise<void>((res, rej) => {
onAllReady = res;
onFatalError = rej;
});
function onShellReady() {
let writable: Writable;
const stream: ReactDOMServerReadableStream = (new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
): any);
// TODO: Move to sub-classing ReadableStream.
stream.allReady = allReady;
resolve(stream);
}
function onShellError(error: mixed) {
// If the shell errors the caller of `renderToReadableStream` won't have access to `allReady`.
// However, `allReady` will be rejected by `onFatalError` as well.
// So we need to catch the duplicate, uncatchable fatal error in `allReady` to prevent a `UnhandledPromiseRejection`.
allReady.catch(() => {});
reject(error);
}
const request = resumeRequest(
children,
postponedState,
resumeRenderState(
postponedState.resumableState,
options ? options.nonce : undefined,
),
options ? options.onError : undefined,
onAllReady,
onShellReady,
onShellError,
onFatalError,
options ? options.onPostpone : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
export { export {
renderToPipeableStream, renderToPipeableStream,
renderToReadableStream,
resumeToPipeableStream, resumeToPipeableStream,
resume,
ReactVersion as version, ReactVersion as version,
}; };

View File

@ -159,9 +159,8 @@ function prerender(
type ResumeOptions = { type ResumeOptions = {
nonce?: NonceOption, nonce?: NonceOption,
signal?: AbortSignal, signal?: AbortSignal,
onError?: (error: mixed) => ?string, onError?: (error: mixed, errorInfo: ErrorInfo) => ?string,
onPostpone?: (reason: string) => void, onPostpone?: (reason: string, postponeInfo: PostponeInfo) => void,
unstable_externalRuntimeSrc?: string | BootstrapScriptDescriptor,
}; };
function resumeAndPrerender( function resumeAndPrerender(

View File

@ -28,6 +28,7 @@ import {
resumeAndPrerenderRequest, resumeAndPrerenderRequest,
startWork, startWork,
startFlowing, startFlowing,
stopFlowing,
abort, abort,
getPostponedState, getPostponedState,
} from 'react-server/src/ReactFizzServer'; } from 'react-server/src/ReactFizzServer';
@ -41,6 +42,8 @@ import {
import {enablePostpone, enableHalt} from 'shared/ReactFeatureFlags'; import {enablePostpone, enableHalt} from 'shared/ReactFeatureFlags';
import {textEncoder} from 'react-server/src/ReactServerStreamConfigNode';
import {ensureCorrectIsomorphicReactVersion} from '../shared/ensureCorrectIsomorphicReactVersion'; import {ensureCorrectIsomorphicReactVersion} from '../shared/ensureCorrectIsomorphicReactVersion';
ensureCorrectIsomorphicReactVersion(); ensureCorrectIsomorphicReactVersion();
@ -72,7 +75,36 @@ type StaticResult = {
prelude: Readable, prelude: Readable,
}; };
function createFakeWritable(readable: any): Writable { function createFakeWritableFromReadableStreamController(
controller: ReadableStreamController,
): Writable {
// The current host config expects a Writable so we create
// a fake writable for now to push into the Readable.
return ({
write(chunk: string | Uint8Array) {
if (typeof chunk === 'string') {
chunk = textEncoder.encode(chunk);
}
controller.enqueue(chunk);
// in web streams there is no backpressure so we can alwas write more
return true;
},
end() {
controller.close();
},
destroy(error) {
// $FlowFixMe[method-unbinding]
if (typeof controller.error === 'function') {
// $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
controller.error(error);
} else {
controller.close();
}
},
}: any);
}
function createFakeWritableFromReadable(readable: any): Writable {
// The current host config expects a Writable so we create // The current host config expects a Writable so we create
// a fake writable for now to push into the Readable. // a fake writable for now to push into the Readable.
return ({ return ({
@ -101,7 +133,7 @@ function prerenderToNodeStream(
startFlowing(request, writable); startFlowing(request, writable);
}, },
}); });
const writable = createFakeWritable(readable); const writable = createFakeWritableFromReadable(readable);
const result: StaticResult = const result: StaticResult =
enablePostpone || enableHalt enablePostpone || enableHalt
@ -157,6 +189,101 @@ function prerenderToNodeStream(
}); });
} }
function prerender(
children: ReactNodeList,
options?: Omit<Options, 'onHeaders'> & {
onHeaders?: (headers: Headers) => void,
},
): Promise<{
postponed: null | PostponedState,
prelude: ReadableStream,
}> {
return new Promise((resolve, reject) => {
const onFatalError = reject;
function onAllReady() {
let writable: Writable;
const stream = new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
);
const result =
enablePostpone || enableHalt
? {
postponed: getPostponedState(request),
prelude: stream,
}
: ({
prelude: stream,
}: any);
resolve(result);
}
const onHeaders = options ? options.onHeaders : undefined;
let onHeadersImpl;
if (onHeaders) {
onHeadersImpl = (headersDescriptor: HeadersDescriptor) => {
onHeaders(new Headers(headersDescriptor));
};
}
const resources = createResumableState(
options ? options.identifierPrefix : undefined,
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.bootstrapScriptContent : undefined,
options ? options.bootstrapScripts : undefined,
options ? options.bootstrapModules : undefined,
);
const request = createPrerenderRequest(
children,
resources,
createRenderState(
resources,
undefined, // nonce is not compatible with prerendered bootstrap scripts
options ? options.unstable_externalRuntimeSrc : undefined,
options ? options.importMap : undefined,
onHeadersImpl,
options ? options.maxHeadersLength : undefined,
),
createRootFormatContext(options ? options.namespaceURI : undefined),
options ? options.progressiveChunkSize : undefined,
options ? options.onError : undefined,
onAllReady,
undefined,
undefined,
onFatalError,
options ? options.onPostpone : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
type ResumeOptions = { type ResumeOptions = {
nonce?: NonceOption, nonce?: NonceOption,
signal?: AbortSignal, signal?: AbortSignal,
@ -178,7 +305,7 @@ function resumeAndPrerenderToNodeStream(
startFlowing(request, writable); startFlowing(request, writable);
}, },
}); });
const writable = createFakeWritable(readable); const writable = createFakeWritableFromReadable(readable);
const result = { const result = {
postponed: getPostponedState(request), postponed: getPostponedState(request),
@ -216,8 +343,79 @@ function resumeAndPrerenderToNodeStream(
}); });
} }
function resumeAndPrerender(
children: ReactNodeList,
postponedState: PostponedState,
options?: ResumeOptions,
): Promise<{
postponed: null | PostponedState,
prelude: ReadableStream,
}> {
return new Promise((resolve, reject) => {
const onFatalError = reject;
function onAllReady() {
let writable: Writable;
const stream = new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
writable =
createFakeWritableFromReadableStreamController(controller);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, writable);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
);
const result = {
postponed: getPostponedState(request),
prelude: stream,
};
resolve(result);
}
const request = resumeAndPrerenderRequest(
children,
postponedState,
resumeRenderState(
postponedState.resumableState,
options ? options.nonce : undefined,
),
options ? options.onError : undefined,
onAllReady,
undefined,
undefined,
onFatalError,
options ? options.onPostpone : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
export { export {
prerender,
prerenderToNodeStream, prerenderToNodeStream,
resumeAndPrerender,
resumeAndPrerenderToNodeStream, resumeAndPrerenderToNodeStream,
ReactVersion as version, ReactVersion as version,
}; };

View File

@ -10,5 +10,7 @@
export * from './ReactDOMFizzServerNode.js'; export * from './ReactDOMFizzServerNode.js';
export { export {
prerenderToNodeStream, prerenderToNodeStream,
prerender,
resumeAndPrerenderToNodeStream, resumeAndPrerenderToNodeStream,
resumeAndPrerender,
} from './ReactDOMFizzStaticNode.js'; } from './ReactDOMFizzStaticNode.js';

View File

@ -7,5 +7,9 @@
* @flow * @flow
*/ */
export {renderToPipeableStream, version} from './ReactDOMFizzServerNode.js'; export {
export {prerenderToNodeStream} from './ReactDOMFizzStaticNode.js'; renderToPipeableStream,
renderToReadableStream,
version,
} from './ReactDOMFizzServerNode.js';
export {prerenderToNodeStream, prerender} from './ReactDOMFizzStaticNode.js';

View File

@ -31,9 +31,23 @@ export function prerenderToNodeStream() {
); );
} }
export function prerender() {
return require('./src/server/react-dom-server.node').prerender.apply(
this,
arguments,
);
}
export function resumeAndPrerenderToNodeStream() { export function resumeAndPrerenderToNodeStream() {
return require('./src/server/react-dom-server.node').resumeAndPrerenderToNodeStream.apply( return require('./src/server/react-dom-server.node').resumeAndPrerenderToNodeStream.apply(
this, this,
arguments, arguments,
); );
} }
export function resumeAndPrerender() {
return require('./src/server/react-dom-server.node').resumeAndPrerender.apply(
this,
arguments,
);
}

View File

@ -189,7 +189,7 @@ export function close(destination: Destination) {
destination.end(); destination.end();
} }
const textEncoder = new TextEncoder(); export const textEncoder: TextEncoder = new TextEncoder();
export function stringToChunk(content: string): Chunk { export function stringToChunk(content: string): Chunk {
return content; return content;