[Flight] Basic Streaming Suspense Support (#17285)

* Return whether to keep flowing in Host config

* Emit basic chunk based streaming in the Flight server

When something suspends a new chunk is created.

* Add reentrancy check

The WHATWG API is designed to be pulled recursively.

We should refactor to favor that approach.

* Basic streaming Suspense support on the client

* Add basic suspense in example

* Add comment describing the protocol that the server generates
This commit is contained in:
Sebastian Markbåge 2019-11-06 09:48:34 -08:00 committed by GitHub
parent f50f39b55f
commit dee03049f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 530 additions and 103 deletions

View File

@ -37,8 +37,26 @@
); );
} }
let resolved = false;
let promise = new Promise(resolve => {
setTimeout(() => {
resolved = true;
resolve();
}, 100);
});
function read() {
if (!resolved) {
throw promise;
}
}
function Title() {
read();
return 'Title';
}
let model = { let model = {
title: 'Title', title: <Title />,
content: { content: {
__html: <HTML />, __html: <HTML />,
} }
@ -69,7 +87,9 @@
function Shell({ data }) { function Shell({ data }) {
let model = data.model; let model = data.model;
return <div> return <div>
<h1>{model.title}</h1> <Suspense fallback="...">
<h1>{model.title}</h1>
</Suspense>
<div dangerouslySetInnerHTML={model.content} /> <div dangerouslySetInnerHTML={model.content} />
</div>; </div>;
} }

View File

@ -26,7 +26,7 @@ function startReadingFromStream(response, stream: ReadableStream): void {
return; return;
} }
let buffer: Uint8Array = (value: any); let buffer: Uint8Array = (value: any);
processBinaryChunk(response, buffer, 0); processBinaryChunk(response, buffer);
return reader.read().then(progress, error); return reader.read().then(progress, error);
} }
function error(e) { function error(e) {

View File

@ -23,7 +23,7 @@ function renderToReadableStream(children: ReactNodeList): ReadableStream {
startWork(request); startWork(request);
}, },
pull(controller) { pull(controller) {
startFlowing(request, controller.desiredSize); startFlowing(request);
}, },
cancel(reason) {}, cancel(reason) {},
}); });

View File

@ -13,7 +13,7 @@ import type {Writable} from 'stream';
import {createRequest, startWork, startFlowing} from 'react-server/inline.dom'; import {createRequest, startWork, startFlowing} from 'react-server/inline.dom';
function createDrainHandler(destination, request) { function createDrainHandler(destination, request) {
return () => startFlowing(request, 0); return () => startFlowing(request);
} }
function pipeToNodeWritable( function pipeToNodeWritable(

View File

@ -23,7 +23,7 @@ function renderToReadableStream(model: ReactModel): ReadableStream {
startWork(request); startWork(request);
}, },
pull(controller) { pull(controller) {
startFlowing(request, controller.desiredSize); startFlowing(request);
}, },
cancel(reason) {}, cancel(reason) {},
}); });

View File

@ -17,7 +17,7 @@ import {
} from 'react-server/flight.inline.dom'; } from 'react-server/flight.inline.dom';
function createDrainHandler(destination, request) { function createDrainHandler(destination, request) {
return () => startFlowing(request, 0); return () => startFlowing(request);
} }
function pipeToNodeWritable(model: ReactModel, destination: Writable): void { function pipeToNodeWritable(model: ReactModel, destination: Writable): void {

View File

@ -20,63 +20,224 @@ export type ReactModelRoot<T> = {|
model: T, model: T,
|}; |};
type OpaqueResponse = { type JSONValue = number | null | boolean | string | {[key: string]: JSONValue};
const PENDING = 0;
const RESOLVED = 1;
const ERRORED = 2;
type PendingChunk = {|
status: 0,
value: Promise<void>,
resolve: () => void,
|};
type ResolvedChunk = {|
status: 1,
value: mixed,
resolve: null,
|};
type ErroredChunk = {|
status: 2,
value: Error,
resolve: null,
|};
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;
type OpaqueResponseWithoutDecoder = {
source: Source, source: Source,
modelRoot: ReactModelRoot<any>,
partialRow: string, partialRow: string,
modelRoot: ReactModelRoot<any>,
chunks: Map<number, Chunk>,
fromJSON: (key: string, value: JSONValue) => any,
};
type OpaqueResponse = OpaqueResponseWithoutDecoder & {
stringDecoder: StringDecoder, stringDecoder: StringDecoder,
rootPing: () => void,
}; };
export function createResponse(source: Source): OpaqueResponse { export function createResponse(source: Source): OpaqueResponse {
let modelRoot = {}; let modelRoot: ReactModelRoot<any> = ({}: any);
Object.defineProperty( let rootChunk: Chunk = createPendingChunk();
modelRoot, definePendingProperty(modelRoot, 'model', rootChunk);
'model', let chunks: Map<number, Chunk> = new Map();
({ chunks.set(0, rootChunk);
configurable: true,
enumerable: true,
get() {
throw rootPromise;
},
}: any),
);
let rootPing; let response: OpaqueResponse = (({
let rootPromise = new Promise(resolve => {
rootPing = resolve;
});
let response: OpaqueResponse = ({
source, source,
modelRoot,
partialRow: '', partialRow: '',
rootPing, modelRoot,
}: any); chunks: chunks,
fromJSON: function(key, value) {
return parseFromJSON(response, this, key, value);
},
}: OpaqueResponseWithoutDecoder): any);
if (supportsBinaryStreams) { if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder(); response.stringDecoder = createStringDecoder();
} }
return response; return response;
} }
function createPendingChunk(): PendingChunk {
let resolve: () => void = (null: any);
let promise = new Promise(r => (resolve = r));
return {
status: PENDING,
value: promise,
resolve: resolve,
};
}
function createErrorChunk(error: Error): ErroredChunk {
return {
status: ERRORED,
value: error,
resolve: null,
};
}
function triggerErrorOnChunk(chunk: Chunk, error: Error): void {
if (chunk.status !== PENDING) {
// We already resolved. We didn't expect to see this.
return;
}
let resolve = chunk.resolve;
let erroredChunk: ErroredChunk = (chunk: any);
erroredChunk.status = ERRORED;
erroredChunk.value = error;
erroredChunk.resolve = null;
resolve();
}
function createResolvedChunk(value: mixed): ResolvedChunk {
return {
status: RESOLVED,
value: value,
resolve: null,
};
}
function resolveChunk(chunk: Chunk, value: mixed): void {
if (chunk.status !== PENDING) {
// We already resolved. We didn't expect to see this.
return;
}
let resolve = chunk.resolve;
let resolvedChunk: ResolvedChunk = (chunk: any);
resolvedChunk.status = RESOLVED;
resolvedChunk.value = value;
resolvedChunk.resolve = null;
resolve();
}
// Report that any missing chunks in the model is now going to throw this // Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises. // error upon read. Also notify any pending promises.
export function reportGlobalError( export function reportGlobalError(
response: OpaqueResponse, response: OpaqueResponse,
error: Error, error: Error,
): void { ): void {
Object.defineProperty( response.chunks.forEach(chunk => {
response.modelRoot, // If this chunk was already resolved or errored, it won't
'model', // trigger an error but if it wasn't then we need to
({ // because we won't be getting any new data to resolve it.
configurable: true, triggerErrorOnChunk(chunk, error);
enumerable: true, });
get() { }
throw error;
}, function definePendingProperty(
}: any), object: Object,
); key: string,
response.rootPing(); chunk: Chunk,
): void {
Object.defineProperty(object, key, {
configurable: false,
enumerable: true,
get() {
if (chunk.status === RESOLVED) {
return chunk.value;
} else {
throw chunk.value;
}
},
});
}
function parseFromJSON(
response: OpaqueResponse,
targetObj: Object,
key: string,
value: JSONValue,
): any {
if (typeof value === 'string' && value[0] === '$') {
if (value[1] === '$') {
// This was an escaped string value.
return value.substring(1);
} else {
let id = parseInt(value.substring(1), 16);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunk = createPendingChunk();
chunks.set(id, chunk);
} else if (chunk.status === RESOLVED) {
return chunk.value;
}
definePendingProperty(targetObj, key, chunk);
return undefined;
}
}
return value;
}
function resolveJSONRow(
response: OpaqueResponse,
id: number,
json: string,
): void {
let model = JSON.parse(json, response.fromJSON);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createResolvedChunk(model));
} else {
resolveChunk(chunk, model);
}
}
function processFullRow(response: OpaqueResponse, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
resolveJSONRow(response, id, json);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
let error = new Error(errorInfo.message);
error.stack = errorInfo.stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
return;
}
default: {
// Assume this is the root model.
resolveJSONRow(response, 0, row);
return;
}
}
} }
export function processStringChunk( export function processStringChunk(
@ -84,36 +245,44 @@ export function processStringChunk(
chunk: string, chunk: string,
offset: number, offset: number,
): void { ): void {
response.partialRow += chunk.substr(offset); let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
} }
export function processBinaryChunk( export function processBinaryChunk(
response: OpaqueResponse, response: OpaqueResponse,
chunk: Uint8Array, chunk: Uint8Array,
offset: number,
): void { ): void {
if (!supportsBinaryStreams) { if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks."); throw new Error("This environment don't support binary chunks.");
} }
response.partialRow += readPartialStringChunk(response.stringDecoder, chunk); let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
} }
let emptyBuffer = new Uint8Array(0);
export function complete(response: OpaqueResponse): void { export function complete(response: OpaqueResponse): void {
if (supportsBinaryStreams) { // In case there are any remaining unresolved chunks, they won't
// This should never be needed since we're expected to have complete // be resolved now. So we need to issue an error to those.
// code units at the end of JSON. // Ideally we should be able to early bail out if we kept a
response.partialRow += readFinalStringChunk( // ref count of pending chunks.
response.stringDecoder, reportGlobalError(response, new Error('Connection closed.'));
emptyBuffer,
);
}
let modelRoot = response.modelRoot;
let model = JSON.parse(response.partialRow);
Object.defineProperty(modelRoot, 'model', {
value: model,
});
response.rootPing();
} }
export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> { export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {

View File

@ -76,10 +76,7 @@ export function startWork(request: OpaqueRequest): void {
scheduleWork(() => performWork(request)); scheduleWork(() => performWork(request));
} }
export function startFlowing( export function startFlowing(request: OpaqueRequest): void {
request: OpaqueRequest,
desiredBytes: number,
): void {
request.flowing = false; request.flowing = false;
flushCompletedChunks(request); flushCompletedChunks(request);
} }

View File

@ -21,6 +21,63 @@ import {
import {renderHostChildrenToString} from './ReactServerFormatConfig'; import {renderHostChildrenToString} from './ReactServerFormatConfig';
import {REACT_ELEMENT_TYPE} from 'shared/ReactSymbols'; import {REACT_ELEMENT_TYPE} from 'shared/ReactSymbols';
/*
FLIGHT PROTOCOL GRAMMAR
Response
- JSONData RowSequence
- JSONData
RowSequence
- Row RowSequence
- Row
Row
- "J" RowID JSONData
- "H" RowID HTMLData
- "B" RowID BlobData
- "U" RowID URLData
- "E" RowID ErrorData
RowID
- HexDigits ":"
HexDigits
- HexDigit HexDigits
- HexDigit
HexDigit
- 0-F
URLData
- (UTF8 encoded URL) "\n"
ErrorData
- (UTF8 encoded JSON: {message: "...", stack: "..."}) "\n"
JSONData
- (UTF8 encoded JSON) "\n"
- String values that begin with $ are escaped with a "$" prefix.
- References to other rows are encoding as JSONReference strings.
JSONReference
- "$" HexDigits
HTMLData
- ByteSize (UTF8 encoded HTML)
BlobData
- ByteSize (Binary Data)
ByteSize
- (unsigned 32-bit integer)
*/
// TODO: Implement HTMLData, BlobData and URLData.
const stringify = JSON.stringify;
export type ReactModel = export type ReactModel =
| React$Element<any> | React$Element<any>
| string | string
@ -42,66 +99,246 @@ type ReactModelObject = {
+[key: string]: ReactModel, +[key: string]: ReactModel,
}; };
type Segment = {
id: number,
model: ReactModel,
ping: () => void,
};
type OpaqueRequest = { type OpaqueRequest = {
destination: Destination, destination: Destination,
model: ReactModel, nextChunkId: number,
completedChunks: Array<Uint8Array>, pendingChunks: number,
pingedSegments: Array<Segment>,
completedJSONChunks: Array<Uint8Array>,
completedErrorChunks: Array<Uint8Array>,
flowing: boolean, flowing: boolean,
toJSON: (key: string, value: ReactModel) => ReactJSONValue,
}; };
export function createRequest( export function createRequest(
model: ReactModel, model: ReactModel,
destination: Destination, destination: Destination,
): OpaqueRequest { ): OpaqueRequest {
return {destination, model, completedChunks: [], flowing: false}; let pingedSegments = [];
let request = {
destination,
nextChunkId: 0,
pendingChunks: 0,
pingedSegments: pingedSegments,
completedJSONChunks: [],
completedErrorChunks: [],
flowing: false,
toJSON: (key: string, value: ReactModel) =>
resolveModelToJSON(request, value),
};
request.pendingChunks++;
let rootSegment = createSegment(request, model);
pingedSegments.push(rootSegment);
return request;
} }
function resolveModelToJSON(key: string, value: ReactModel): ReactJSONValue { function attemptResolveModelComponent(element: React$Element<any>): ReactModel {
while (value && value.$$typeof === REACT_ELEMENT_TYPE) { let type = element.type;
let props = element.props;
if (typeof type === 'function') {
// This is a nested view model.
return type(props);
} else if (typeof type === 'string') {
// This is a host element. E.g. HTML.
return renderHostChildrenToString(element);
} else {
throw new Error('Unsupported type.');
}
}
function pingSegment(request: OpaqueRequest, segment: Segment): void {
let pingedSegments = request.pingedSegments;
pingedSegments.push(segment);
if (pingedSegments.length === 1) {
scheduleWork(() => performWork(request));
}
}
function createSegment(request: OpaqueRequest, model: ReactModel): Segment {
let id = request.nextChunkId++;
let segment = {
id,
model,
ping: () => pingSegment(request, segment),
};
return segment;
}
function serializeIDRef(id: number): string {
return '$' + id.toString(16);
}
function serializeRowHeader(tag: string, id: number) {
return tag + id.toString(16) + ':';
}
function escapeStringValue(value: string): string {
if (value[0] === '$') {
// We need to escape $ prefixed strings since we use that to encode
// references to IDs.
return '$' + value;
} else {
return value;
}
}
function resolveModelToJSON(
request: OpaqueRequest,
value: ReactModel,
): ReactJSONValue {
if (typeof value === 'string') {
return escapeStringValue(value);
}
while (
typeof value === 'object' &&
value !== null &&
value.$$typeof === REACT_ELEMENT_TYPE
) {
let element: React$Element<any> = (value: any); let element: React$Element<any> = (value: any);
let type = element.type; try {
let props = element.props; value = attemptResolveModelComponent(element);
if (typeof type === 'function') { } catch (x) {
// This is a nested view model. if (typeof x === 'object' && x !== null && typeof x.then === 'function') {
value = type(props); // Something suspended, we'll need to create a new segment and resolve it later.
continue; request.pendingChunks++;
} else if (typeof type === 'string') { let newSegment = createSegment(request, element);
// This is a host element. E.g. HTML. let ping = newSegment.ping;
return renderHostChildrenToString(element); x.then(ping, ping);
} else { return serializeIDRef(newSegment.id);
throw new Error('Unsupported type.'); } else {
request.pendingChunks++;
let errorId = request.nextChunkId++;
emitErrorChunk(request, errorId, x);
return serializeIDRef(errorId);
}
} }
} }
return value; return value;
} }
function emitErrorChunk(
request: OpaqueRequest,
id: number,
error: mixed,
): void {
// TODO: We should not leak error messages to the client in prod.
// Give this an error code instead and log on the server.
// We can serialize the error in DEV as a convenience.
let message;
let stack = '';
try {
if (error instanceof Error) {
message = '' + error.message;
stack = '' + error.stack;
} else {
message = 'Error: ' + (error: any);
}
} catch (x) {
message = 'An error occurred but serializing the error message failed.';
}
let errorInfo = {message, stack};
let row = serializeRowHeader('E', id) + stringify(errorInfo) + '\n';
request.completedErrorChunks.push(convertStringToBuffer(row));
}
function retrySegment(request: OpaqueRequest, segment: Segment): void {
let value = segment.model;
try {
while (
typeof value === 'object' &&
value !== null &&
value.$$typeof === REACT_ELEMENT_TYPE
) {
// If this is a nested model, there's no need to create another chunk,
// we can reuse the existing one and try again.
let element: React$Element<any> = (value: any);
segment.model = element;
value = attemptResolveModelComponent(element);
}
let json = stringify(value, request.toJSON);
let row;
let id = segment.id;
if (id === 0) {
row = json + '\n';
} else {
row = serializeRowHeader('J', id) + json + '\n';
}
request.completedJSONChunks.push(convertStringToBuffer(row));
} catch (x) {
if (typeof x === 'object' && x !== null && typeof x.then === 'function') {
// Something suspended again, let's pick it back up later.
let ping = segment.ping;
x.then(ping, ping);
return;
} else {
// This errored, we need to serialize this error to the
emitErrorChunk(request, segment.id, x);
}
}
}
function performWork(request: OpaqueRequest): void { function performWork(request: OpaqueRequest): void {
let rootModel = request.model; let pingedSegments = request.pingedSegments;
request.model = null; request.pingedSegments = [];
let json = JSON.stringify(rootModel, resolveModelToJSON); for (let i = 0; i < pingedSegments.length; i++) {
request.completedChunks.push(convertStringToBuffer(json)); let segment = pingedSegments[i];
retrySegment(request, segment);
}
if (request.flowing) { if (request.flowing) {
flushCompletedChunks(request); flushCompletedChunks(request);
} }
flushBuffered(request.destination);
} }
function flushCompletedChunks(request: OpaqueRequest) { let reentrant = false;
function flushCompletedChunks(request: OpaqueRequest): void {
if (reentrant) {
return;
}
reentrant = true;
let destination = request.destination; let destination = request.destination;
let chunks = request.completedChunks;
request.completedChunks = [];
beginWriting(destination); beginWriting(destination);
try { try {
for (let i = 0; i < chunks.length; i++) { let jsonChunks = request.completedJSONChunks;
let chunk = chunks[i]; let i = 0;
writeChunk(destination, chunk); for (; i < jsonChunks.length; i++) {
request.pendingChunks--;
let chunk = jsonChunks[i];
if (!writeChunk(destination, chunk)) {
request.flowing = false;
i++;
break;
}
} }
jsonChunks.splice(0, i);
let errorChunks = request.completedErrorChunks;
i = 0;
for (; i < errorChunks.length; i++) {
request.pendingChunks--;
let chunk = errorChunks[i];
if (!writeChunk(destination, chunk)) {
request.flowing = false;
i++;
break;
}
}
errorChunks.splice(0, i);
} finally { } finally {
reentrant = false;
completeWriting(destination); completeWriting(destination);
} }
close(destination); flushBuffered(destination);
if (request.pendingChunks === 0) {
// We're done.
close(destination);
}
} }
export function startWork(request: OpaqueRequest): void { export function startWork(request: OpaqueRequest): void {
@ -109,10 +346,7 @@ export function startWork(request: OpaqueRequest): void {
scheduleWork(() => performWork(request)); scheduleWork(() => performWork(request));
} }
export function startFlowing( export function startFlowing(request: OpaqueRequest): void {
request: OpaqueRequest, request.flowing = true;
desiredBytes: number,
): void {
request.flowing = false;
flushCompletedChunks(request); flushCompletedChunks(request);
} }

View File

@ -20,8 +20,12 @@ export function flushBuffered(destination: Destination) {
export function beginWriting(destination: Destination) {} export function beginWriting(destination: Destination) {}
export function writeChunk(destination: Destination, buffer: Uint8Array) { export function writeChunk(
destination: Destination,
buffer: Uint8Array,
): boolean {
destination.enqueue(buffer); destination.enqueue(buffer);
return destination.desiredSize > 0;
} }
export function completeWriting(destination: Destination) {} export function completeWriting(destination: Destination) {}

View File

@ -40,9 +40,12 @@ export function beginWriting(destination: Destination) {
} }
} }
export function writeChunk(destination: Destination, buffer: Uint8Array) { export function writeChunk(
destination: Destination,
buffer: Uint8Array,
): boolean {
let nodeBuffer = ((buffer: any): Buffer); // close enough let nodeBuffer = ((buffer: any): Buffer); // close enough
destination.write(nodeBuffer); return destination.write(nodeBuffer);
} }
export function completeWriting(destination: Destination) { export function completeWriting(destination: Destination) {