worker: implement MessagePort and MessageChannel

Implement `MessagePort` and `MessageChannel` along the lines of
the DOM classes of the same names. `MessagePort`s initially
support transferring only `ArrayBuffer`s.

Thanks to Stephen Belanger for reviewing this change in its
original form, to Benjamin Gruenbaum for reviewing the
added tests in their original form, and to Olivia Hugger
for reviewing the documentation in its original form.

Refs: https://github.com/ayojs/ayo/pull/98

PR-URL: https://github.com/nodejs/node/pull/20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
This commit is contained in:
Anna Henningsen 2017-09-05 22:38:32 +02:00 committed by Michaël Zasso
parent 9a734132f9
commit 337be58ee6
23 changed files with 1140 additions and 2 deletions

View File

@ -53,6 +53,7 @@
* [Utilities](util.html)
* [V8](v8.html)
* [VM](vm.html)
* [Worker](worker.html)
* [ZLIB](zlib.html)
<div class="line"></div>

View File

@ -46,4 +46,5 @@
@include util
@include v8
@include vm
@include worker
@include zlib

View File

@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel.
Used when the main process is trying to read data from the child process's
STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option.
<a id="ERR_CLOSED_MESSAGE_PORT"></a>
### ERR_CLOSED_MESSAGE_PORT
There was an attempt to use a `MessagePort` instance in a closed
state, usually after `.close()` has been called.
<a id="ERR_CONSOLE_WRITABLE_STREAM"></a>
### ERR_CONSOLE_WRITABLE_STREAM
`Console` was instantiated without `stdout` stream, or `Console` has a
non-writable `stdout` or `stderr` stream.
<a id="ERR_CONSTRUCT_CALL_REQUIRED"></a>
### ERR_CONSTRUCT_CALL_REQUIRED
A constructor for a class was called without `new`.
<a id="ERR_CPU_USAGE"></a>
### ERR_CPU_USAGE
@ -1213,6 +1224,11 @@ urlSearchParams.has.call(buf, 'foo');
// Throws a TypeError with code 'ERR_INVALID_THIS'
```
<a id="ERR_INVALID_TRANSFER_OBJECT"></a>
### ERR_INVALID_TRANSFER_OBJECT
An invalid transfer object was passed to `postMessage()`.
<a id="ERR_INVALID_TUPLE"></a>
### ERR_INVALID_TUPLE

146
doc/api/worker.md Normal file
View File

@ -0,0 +1,146 @@
# Worker
<!--introduced_in=REPLACEME-->
> Stability: 1 - Experimental
## Class: MessageChannel
<!-- YAML
added: REPLACEME
-->
Instances of the `worker.MessageChannel` class represent an asynchronous,
two-way communications channel.
The `MessageChannel` has no methods of its own. `new MessageChannel()`
yields an object with `port1` and `port2` properties, which refer to linked
[`MessagePort`][] instances.
```js
const { MessageChannel } = require('worker');
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// prints: received { foo: 'bar' }
```
## Class: MessagePort
<!-- YAML
added: REPLACEME
-->
* Extends: {EventEmitter}
Instances of the `worker.MessagePort` class represent one end of an
asynchronous, two-way communications channel. It can be used to transfer
structured data, memory regions and other `MessagePort`s between different
[`Worker`][]s.
With the exception of `MessagePort`s being [`EventEmitter`][]s rather
than `EventTarget`s, this implementation matches [browser `MessagePort`][]s.
### Event: 'close'
<!-- YAML
added: REPLACEME
-->
The `'close'` event is emitted once either side of the channel has been
disconnected.
### Event: 'message'
<!-- YAML
added: REPLACEME
-->
* `value` {any} The transmitted value
The `'message'` event is emitted for any incoming message, containing the cloned
input of [`port.postMessage()`][].
Listeners on this event will receive a clone of the `value` parameter as passed
to `postMessage()` and no further arguments.
### port.close()
<!-- YAML
added: REPLACEME
-->
Disables further sending of messages on either side of the connection.
This method can be called once you know that no further communication
will happen over this `MessagePort`.
### port.postMessage(value[, transferList])
<!-- YAML
added: REPLACEME
-->
* `value` {any}
* `transferList` {Object[]}
Sends a JavaScript value to the receiving side of this channel.
`value` will be transferred in a way which is compatible with
the [HTML structured clone algorithm][]. In particular, it may contain circular
references and objects like typed arrays that the `JSON` API is not able
to stringify.
`transferList` may be a list of `ArrayBuffer` objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`).
`value` may still contain `ArrayBuffer` instances that are not in
`transferList`; in that case, the underlying memory is copied rather than moved.
For more information on the serialization and deserialization mechanisms
behind this API, see the [serialization API of the `v8` module][v8.serdes].
Because the object cloning uses the structured clone algorithm,
non-enumerable properties, property accessors, and object prototypes are
not preserved. In particular, [`Buffer`][] objects will be read as
plain [`Uint8Array`][]s on the receiving side.
The message object will be cloned immediately, and can be modified after
posting without having side effects.
### port.ref()
<!-- YAML
added: REPLACEME
-->
Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed port will
*not* let the program exit if it's the only active handle left (the default
behavior). If the port is `ref()`ed, calling `ref()` again will have no effect.
If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.
### port.start()
<!-- YAML
added: REPLACEME
-->
Starts receiving messages on this `MessagePort`. When using this port
as an event emitter, this will be called automatically once `'message'`
listeners are attached.
### port.unref()
<!-- YAML
added: REPLACEME
-->
Calling `unref()` on a port will allow the thread to exit if this is the only
active handle in the event system. If the port is already `unref()`ed calling
`unref()` again will have no effect.
If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.
[`Buffer`]: buffer.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
[v8.serdes]: v8.html#v8_serialization_api
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm

View File

@ -194,7 +194,8 @@
};
NativeModule.isInternal = function(id) {
return id.startsWith('internal/');
return id.startsWith('internal/') ||
(id === 'worker' && !process.binding('config').experimentalWorker);
};
}

View File

@ -105,6 +105,11 @@ const builtinLibs = [
'v8', 'vm', 'zlib'
];
if (process.binding('config').experimentalWorker) {
builtinLibs.push('worker');
builtinLibs.sort();
}
if (typeof process.binding('inspector').open === 'function') {
builtinLibs.push('inspector');
builtinLibs.sort();

105
lib/internal/worker.js Normal file
View File

@ -0,0 +1,105 @@
'use strict';
const EventEmitter = require('events');
const util = require('util');
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
util.inherits(MessagePort, EventEmitter);
const kOnMessageListener = Symbol('kOnMessageListener');
const debug = util.debuglog('worker');
// A MessagePort consists of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug('received message', payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
this.start();
} else {
this.unref();
this.stop();
}
}
});
// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}
Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: true,
writable: false,
value: oninit
});
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}
Object.defineProperty(MessagePort.prototype, handle_onclose, {
enumerable: false,
writable: false,
value: onclose
});
const originalClose = MessagePort.prototype.close;
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
originalClose.call(this);
};
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}
module.exports = {
MessagePort,
MessageChannel
};

5
lib/worker.js Normal file
View File

@ -0,0 +1,5 @@
'use strict';
const { MessagePort, MessageChannel } = require('internal/worker');
module.exports = { MessagePort, MessageChannel };

View File

@ -78,6 +78,7 @@
'lib/util.js',
'lib/v8.js',
'lib/vm.js',
'lib/worker.js',
'lib/zlib.js',
'lib/internal/assert.js',
'lib/internal/async_hooks.js',
@ -155,6 +156,7 @@
'lib/internal/validators.js',
'lib/internal/stream_base_commons.js',
'lib/internal/vm/module.js',
'lib/internal/worker.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
@ -333,6 +335,7 @@
'src/node_file.cc',
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_messaging.cc',
'src/node_os.cc',
'src/node_platform.cc',
'src/node_perf.cc',
@ -390,6 +393,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_perf.h',
'src/node_perf_common.h',

View File

@ -49,6 +49,7 @@ namespace node {
V(HTTP2SETTINGS) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \

View File

@ -193,6 +193,7 @@ struct PackageConfig {
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
V(message_string, "message") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
V(name_string, "name") \
@ -212,6 +213,7 @@ struct PackageConfig {
V(onhandshakedone_string, "onhandshakedone") \
V(onhandshakestart_string, "onhandshakestart") \
V(onheaders_string, "onheaders") \
V(oninit_string, "oninit") \
V(onmessage_string, "onmessage") \
V(onnewsession_string, "onnewsession") \
V(onocspresponse_string, "onocspresponse") \
@ -242,6 +244,8 @@ struct PackageConfig {
V(pipe_target_string, "pipeTarget") \
V(pipe_source_string, "pipeSource") \
V(port_string, "port") \
V(port1_string, "port1") \
V(port2_string, "port2") \
V(preference_string, "preference") \
V(priority_string, "priority") \
V(promise_string, "promise") \
@ -323,6 +327,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(performance_entry_callback, v8::Function) \

View File

@ -248,6 +248,11 @@ bool config_experimental_modules = false;
// that is used by lib/vm.js
bool config_experimental_vm_modules = false;
// Set in node.cc by ParseArgs when --experimental-worker is used.
// Used in node_config.cc to set a constant on process.binding('config')
// that is used by lib/worker.js
bool config_experimental_worker = false;
// Set in node.cc by ParseArgs when --experimental-repl-await is used.
// Used in node_config.cc to set a constant on process.binding('config')
// that is used by lib/repl.js.
@ -3104,6 +3109,7 @@ static void PrintHelp() {
" --experimental-vm-modules experimental ES Module support\n"
" in vm module\n"
#endif // defined(NODE_HAVE_I18N_SUPPORT)
" --experimental-worker experimental threaded Worker support\n"
#if HAVE_OPENSSL && NODE_FIPS_MODE
" --force-fips force FIPS crypto (cannot be disabled)\n"
#endif // HAVE_OPENSSL && NODE_FIPS_MODE
@ -3267,6 +3273,7 @@ static void CheckIfAllowedInEnv(const char* exe, bool is_env,
"--experimental-modules",
"--experimental-repl-await",
"--experimental-vm-modules",
"--experimental-worker",
"--expose-http2", // keep as a non-op through v9.x
"--force-fips",
"--icu-data-dir",
@ -3465,6 +3472,8 @@ static void ParseArgs(int* argc,
new_v8_argc += 1;
} else if (strcmp(arg, "--experimental-vm-modules") == 0) {
config_experimental_vm_modules = true;
} else if (strcmp(arg, "--experimental-worker") == 0) {
config_experimental_worker = true;
} else if (strcmp(arg, "--experimental-repl-await") == 0) {
config_experimental_repl_await = true;
} else if (strcmp(arg, "--loader") == 0) {

View File

@ -91,6 +91,9 @@ static void Initialize(Local<Object> target,
if (config_experimental_vm_modules)
READONLY_BOOLEAN_PROPERTY("experimentalVMModules");
if (config_experimental_worker)
READONLY_BOOLEAN_PROPERTY("experimentalWorker");
if (config_experimental_repl_await)
READONLY_BOOLEAN_PROPERTY("experimentalREPLAwait");

View File

@ -19,9 +19,12 @@ namespace node {
#define ERRORS_WITH_CODE(V) \
V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \
V(ERR_BUFFER_TOO_LARGE, Error) \
V(ERR_CLOSED_MESSAGE_PORT, Error) \
V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \
V(ERR_INDEX_OUT_OF_RANGE, RangeError) \
V(ERR_INVALID_ARG_VALUE, TypeError) \
V(ERR_INVALID_ARG_TYPE, TypeError) \
V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \
V(ERR_MEMORY_ALLOCATION_FAILED, Error) \
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MODULE, Error) \
@ -48,7 +51,10 @@ namespace node {
// Errors with predefined static messages
#define PREDEFINED_ERROR_MESSAGES(V) \
V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \
V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \
V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory")
#define V(code, message) \

View File

@ -114,6 +114,7 @@ struct sockaddr;
V(http_parser) \
V(inspector) \
V(js_stream) \
V(messaging) \
V(module_wrap) \
V(os) \
V(performance) \
@ -189,6 +190,11 @@ extern bool config_experimental_modules;
// that is used by lib/vm.js
extern bool config_experimental_vm_modules;
// Set in node.cc by ParseArgs when --experimental-vm-modules is used.
// Used in node_config.cc to set a constant on process.binding('config')
// that is used by lib/vm.js
extern bool config_experimental_worker;
// Set in node.cc by ParseArgs when --experimental-repl-await is used.
// Used in node_config.cc to set a constant on process.binding('config')
// that is used by lib/repl.js.

548
src/node_messaging.cc Normal file
View File

@ -0,0 +1,548 @@
#include "node_messaging.h"
#include "node_internals.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "util.h"
#include "util-inl.h"
#include "async_wrap.h"
#include "async_wrap-inl.h"
using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferCreationMode;
using v8::Context;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Isolate;
using v8::Just;
using v8::Local;
using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::String;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
namespace node {
namespace worker {
Message::Message(MallocedBuffer<char>&& buffer)
: main_message_buf_(std::move(buffer)) {}
namespace {
// This is used to tell V8 how to read transferred host objects, like other
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
class DeserializerDelegate : public ValueDeserializer::Delegate {
public:
DeserializerDelegate(Message* m, Environment* env)
: env_(env), msg_(m) {}
ValueDeserializer* deserializer = nullptr;
private:
Environment* env_;
Message* msg_;
};
} // anonymous namespace
MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
DeserializerDelegate delegate(this, env);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
main_message_buf_.size,
&delegate);
delegate.deserializer = &deserializer;
// Attach all transfered ArrayBuffers to their new Isolate.
for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
Local<ArrayBuffer> ab =
ArrayBuffer::New(env->isolate(),
array_buffer_contents_[i].release(),
array_buffer_contents_[i].size,
ArrayBufferCreationMode::kInternalized);
deserializer.TransferArrayBuffer(i, ab);
}
array_buffer_contents_.clear();
if (deserializer.ReadHeader(context).IsNothing())
return MaybeLocal<Value>();
return handle_scope.Escape(
deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}
namespace {
// This tells V8 how to serialize objects that it does not understand
// (e.g. C++ objects) into the output buffer, in a way that our own
// DeserializerDelegate understands how to unpack.
class SerializerDelegate : public ValueSerializer::Delegate {
public:
SerializerDelegate(Environment* env, Local<Context> context, Message* m)
: env_(env), context_(context), msg_(m) {}
void ThrowDataCloneError(Local<String> message) override {
env_->isolate()->ThrowException(Exception::Error(message));
}
ValueSerializer* serializer = nullptr;
private:
Environment* env_;
Local<Context> context_;
Message* msg_;
friend class worker::Message;
};
} // anynomous namespace
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
Local<Value> transfer_list_v) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
// Verify that we're not silently overwriting an existing message.
CHECK(main_message_buf_.is_empty());
SerializerDelegate delegate(env, context, this);
ValueSerializer serializer(env->isolate(), &delegate);
delegate.serializer = &serializer;
std::vector<Local<ArrayBuffer>> array_buffers;
if (transfer_list_v->IsArray()) {
Local<Array> transfer_list = transfer_list_v.As<Array>();
uint32_t length = transfer_list->Length();
for (uint32_t i = 0; i < length; ++i) {
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsNeuterable() || ab->IsExternal())
continue;
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
}
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
}
}
serializer.WriteHeader();
if (serializer.WriteValue(context, input).IsNothing()) {
return Nothing<bool>();
}
for (Local<ArrayBuffer> ab : array_buffers) {
// If serialization succeeded, we want to take ownership of
// (a.k.a. externalize) the underlying memory region and render
// it inaccessible in this Isolate.
ArrayBuffer::Contents contents = ab->Externalize();
ab->Neuter();
array_buffer_contents_.push_back(
MallocedBuffer<char> { static_cast<char*>(contents.Data()),
contents.ByteLength() });
}
// The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release();
main_message_buf_ =
MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
return Just(true);
}
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
MessagePortData::~MessagePortData() {
CHECK_EQ(owner_, nullptr);
Disentangle();
}
void MessagePortData::AddToIncomingQueue(Message&& message) {
// This function will be called by other threads.
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));
if (owner_ != nullptr)
owner_->TriggerAsync();
}
bool MessagePortData::IsSiblingClosed() const {
Mutex::ScopedLock lock(*sibling_mutex_);
return sibling_ == nullptr;
}
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK_EQ(a->sibling_, nullptr);
CHECK_EQ(b->sibling_, nullptr);
a->sibling_ = b;
b->sibling_ = a;
a->sibling_mutex_ = b->sibling_mutex_;
}
void MessagePortData::PingOwnerAfterDisentanglement() {
Mutex::ScopedLock lock(mutex_);
if (owner_ != nullptr)
owner_->TriggerAsync();
}
void MessagePortData::Disentangle() {
// Grab a copy of the sibling mutex, then replace it so that each sibling
// has its own sibling_mutex_ now.
std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
Mutex::ScopedLock sibling_lock(*sibling_mutex);
sibling_mutex_ = std::make_shared<Mutex>();
MessagePortData* sibling = sibling_;
if (sibling_ != nullptr) {
sibling_->sibling_ = nullptr;
sibling_ = nullptr;
}
// We close MessagePorts after disentanglement, so we trigger the
// corresponding uv_async_t to let them know that this happened.
PingOwnerAfterDisentanglement();
if (sibling != nullptr) {
sibling->PingOwnerAfterDisentanglement();
}
}
MessagePort::~MessagePort() {
if (data_)
data_->owner_ = nullptr;
}
MessagePort::MessagePort(Environment* env,
Local<Context> context,
Local<Object> wrap)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(new uv_async_t()),
AsyncWrap::PROVIDER_MESSAGEPORT),
data_(new MessagePortData(this)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = static_cast<MessagePort*>(handle->data);
channel->OnMessage();
};
CHECK_EQ(uv_async_init(env->event_loop(),
async(),
onmessage), 0);
async()->data = static_cast<void*>(this);
Local<Value> fn;
if (!wrap->Get(context, env->oninit_string()).ToLocal(&fn))
return;
if (fn->IsFunction()) {
Local<Function> init = fn.As<Function>();
USE(init->Call(context, wrap, 0, nullptr));
}
}
void MessagePort::AddToIncomingQueue(Message&& message) {
data_->AddToIncomingQueue(std::move(message));
}
uv_async_t* MessagePort::async() {
return reinterpret_cast<uv_async_t*>(GetHandle());
}
void MessagePort::TriggerAsync() {
CHECK_EQ(uv_async_send(async()), 0);
}
void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args.IsConstructCall()) {
THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
return;
}
Local<Context> context = args.This()->CreationContext();
Context::Scope context_scope(context);
new MessagePort(env, context, args.This());
}
MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
std::unique_ptr<MessagePortData> data) {
Context::Scope context_scope(context);
Local<Function> ctor;
if (!GetMessagePortConstructor(env, context).ToLocal(&ctor))
return nullptr;
MessagePort* port = nullptr;
// Construct a new instance, then assign the listener instance and possibly
// the MessagePortData to it.
Local<Object> instance;
if (!ctor->NewInstance(context).ToLocal(&instance))
return nullptr;
ASSIGN_OR_RETURN_UNWRAP(&port, instance, nullptr);
if (data) {
port->Detach();
port->data_ = std::move(data);
port->data_->owner_ = port;
// If the existing MessagePortData object had pending messages, this is
// the easiest way to run that queue.
port->TriggerAsync();
}
return port;
}
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
Local<Context> context = object()->CreationContext();
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
// messages, so we need to check that this handle still owns its `data_` field
// on every iteration.
while (data_) {
Message received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
break;
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
if (!env()->can_call_into_js()) {
// In this case there is nothing to do but to drain the current queue.
continue;
}
{
// Call the JS .onmessage() callback.
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
Local<Value> args[] = {
received.Deserialize(env(), context).FromMaybe(Local<Value>())
};
if (args[0].IsEmpty() ||
!object()->Has(context, env()->onmessage_string()).FromMaybe(false) ||
MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
return;
}
}
}
if (data_ && data_->IsSiblingClosed()) {
Close();
}
}
bool MessagePort::IsSiblingClosed() const {
CHECK(data_);
return data_->IsSiblingClosed();
}
void MessagePort::OnClose() {
if (data_) {
data_->owner_ = nullptr;
data_->Disentangle();
}
data_.reset();
delete async();
}
std::unique_ptr<MessagePortData> MessagePort::Detach() {
Mutex::ScopedLock lock(data_->mutex_);
data_->owner_ = nullptr;
return std::move(data_);
}
void MessagePort::Send(Message&& message) {
Mutex::ScopedLock lock(*data_->sibling_mutex_);
if (data_->sibling_ == nullptr)
return;
data_->sibling_->AddToIncomingQueue(std::move(message));
}
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Message msg;
if (msg.Serialize(env, object()->CreationContext(), args[0], args[1])
.IsNothing()) {
return;
}
Send(std::move(msg));
}
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
if (!port->data_) {
return THROW_ERR_CLOSED_MESSAGE_PORT(env);
}
if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}
port->Send(args);
}
void MessagePort::Start() {
Mutex::ScopedLock lock(data_->mutex_);
data_->receiving_messages_ = true;
if (!data_->incoming_messages_.empty())
TriggerAsync();
}
void MessagePort::Stop() {
Mutex::ScopedLock lock(data_->mutex_);
data_->receiving_messages_ = false;
}
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
if (!port->data_) {
THROW_ERR_CLOSED_MESSAGE_PORT(env);
return;
}
port->Start();
}
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
if (!port->data_) {
THROW_ERR_CLOSED_MESSAGE_PORT(env);
return;
}
port->Stop();
}
size_t MessagePort::self_size() const {
Mutex::ScopedLock lock(data_->mutex_);
size_t sz = sizeof(*this) + sizeof(*data_);
for (const Message& msg : data_->incoming_messages_)
sz += sizeof(msg) + msg.main_message_buf_.size;
return sz;
}
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
}
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
MessagePortData::Entangle(a->data_.get(), b);
}
MaybeLocal<Function> GetMessagePortConstructor(
Environment* env, Local<Context> context) {
// Factor generating the MessagePort JS constructor into its own piece
// of code, because it is needed early on in the child environment setup.
Local<FunctionTemplate> templ = env->message_port_constructor_template();
if (!templ.IsEmpty())
return templ->GetFunction(context);
{
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
m->SetClassName(env->message_port_constructor_string());
m->InstanceTemplate()->SetInternalFieldCount(1);
AsyncWrap::AddWrapMethods(env, m);
env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
env->SetProtoMethod(m, "start", MessagePort::Start);
env->SetProtoMethod(m, "stop", MessagePort::Stop);
env->SetProtoMethod(m, "close", HandleWrap::Close);
env->SetProtoMethod(m, "unref", HandleWrap::Unref);
env->SetProtoMethod(m, "ref", HandleWrap::Ref);
env->SetProtoMethod(m, "hasRef", HandleWrap::HasRef);
env->set_message_port_constructor_template(m);
}
return GetMessagePortConstructor(env, context);
}
namespace {
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args.IsConstructCall()) {
THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
return;
}
Local<Context> context = args.This()->CreationContext();
Context::Scope context_scope(context);
MessagePort* port1 = MessagePort::New(env, context);
MessagePort* port2 = MessagePort::New(env, context);
MessagePort::Entangle(port1, port2);
args.This()->Set(env->context(), env->port1_string(), port1->object())
.FromJust();
args.This()->Set(env->context(), env->port2_string(), port2->object())
.FromJust();
}
static void InitMessaging(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
{
Local<String> message_channel_string =
FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
templ->SetClassName(message_channel_string);
target->Set(env->context(),
message_channel_string,
templ->GetFunction(context).ToLocalChecked()).FromJust();
}
target->Set(context,
env->message_port_constructor_string(),
GetMessagePortConstructor(env, context).ToLocalChecked())
.FromJust();
}
} // anonymous namespace
} // namespace worker
} // namespace node
NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)

167
src/node_messaging.h Normal file
View File

@ -0,0 +1,167 @@
#ifndef SRC_NODE_MESSAGING_H_
#define SRC_NODE_MESSAGING_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "env.h"
#include "node_mutex.h"
#include <list>
#include <memory>
namespace node {
namespace worker {
class MessagePortData;
class MessagePort;
// Represents a single communication message.
class Message {
public:
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;
// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
v8::Local<v8::Context> context);
// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
// deserialization.
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);
private:
MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_;
friend class MessagePort;
};
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData {
public:
explicit MessagePortData(MessagePort* owner);
~MessagePortData();
MessagePortData(MessagePortData&& other) = delete;
MessagePortData& operator=(MessagePortData&& other) = delete;
MessagePortData(const MessagePortData& other) = delete;
MessagePortData& operator=(const MessagePortData& other) = delete;
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
void AddToIncomingQueue(Message&& message);
// Returns true if and only this MessagePort is currently not entangled
// with another message port.
bool IsSiblingClosed() const;
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
// Removes any possible sibling. This is thread-safe (it acquires both
// `sibling_mutex_` and `mutex_`), and has to be because it is called once
// the corresponding JS handle handle wants to close
// which can happen on either side of a worker.
void Disentangle();
private:
// After disentangling this message port, the owner handle (if any)
// is asynchronously triggered, so that it can close down naturally.
void PingOwnerAfterDisentanglement();
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
bool receiving_messages_ = false;
std::list<Message> incoming_messages_;
MessagePort* owner_ = nullptr;
// This mutex protects the sibling_ field and is shared between two entangled
// MessagePorts. If both mutexes are acquired, this one needs to be
// acquired first.
std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
MessagePortData* sibling_ = nullptr;
friend class MessagePort;
};
// A message port that receives messages from other threads, including
// the uv_async_t handle that is used to notify the current event loop of
// new incoming messages.
class MessagePort : public HandleWrap {
public:
// Create a new MessagePort. The `context` argument specifies the Context
// instance that is used for creating the values emitted from this port.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);
~MessagePort();
// Create a new message port instance, optionally over an existing
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr);
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If there is no sibling, i.e. this port is closed,
// this message is silently discarded.
void Send(Message&& message);
void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
// Deliver a single message into this port's incoming queue.
void AddToIncomingQueue(Message&& message);
// Start processing messages on this port as a receiving end.
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
static void Entangle(MessagePort* a, MessagePortData* b);
// Detach this port's data for transferring. After this, the MessagePortData
// is no longer associated with this handle, although it can still receive
// messages.
std::unique_ptr<MessagePortData> Detach();
bool IsSiblingClosed() const;
size_t self_size() const override;
private:
void OnClose() override;
void OnMessage();
void TriggerAsync();
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
friend class MessagePortData;
};
v8::MaybeLocal<v8::Function> GetMessagePortConstructor(
Environment* env, v8::Local<v8::Context> context);
} // namespace worker
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_MESSAGING_H_

View File

@ -436,8 +436,11 @@ struct MallocedBuffer {
return ret;
}
inline bool is_empty() const { return data == nullptr; }
MallocedBuffer() : data(nullptr) {}
explicit MallocedBuffer(size_t size) : data(Malloc<T>(size)), size(size) {}
MallocedBuffer(char* data, size_t size) : data(data), size(size) {}
MallocedBuffer(MallocedBuffer&& other) : data(other.data), size(other.size) {
other.data = nullptr;
}

View File

@ -0,0 +1,26 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel } = require('worker');
{
const channel = new MessageChannel();
channel.port1.on('message', common.mustCall(({ typedArray }) => {
assert.deepStrictEqual(typedArray, new Uint8Array([0, 1, 2, 3, 4]));
}));
const typedArray = new Uint8Array([0, 1, 2, 3, 4]);
channel.port2.postMessage({ typedArray }, [ typedArray.buffer ]);
assert.strictEqual(typedArray.buffer.byteLength, 0);
channel.port2.close();
}
{
const channel = new MessageChannel();
channel.port1.on('close', common.mustCall());
channel.port2.on('close', common.mustCall());
channel.port2.close();
}

View File

@ -0,0 +1,20 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel } = require('worker');
{
const { port1, port2 } = new MessageChannel();
const arrayBuffer = new ArrayBuffer(40);
const typedArray = new Uint32Array(arrayBuffer);
typedArray[0] = 0x12345678;
port1.postMessage(typedArray, [ arrayBuffer ]);
port2.on('message', common.mustCall((received) => {
assert.strictEqual(received[0], 0x12345678);
port2.close(common.mustCall());
}));
}

View File

@ -0,0 +1,56 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel, MessagePort } = require('worker');
{
const { port1, port2 } = new MessageChannel();
assert(port1 instanceof MessagePort);
assert(port2 instanceof MessagePort);
const input = { a: 1 };
port1.postMessage(input);
port2.on('message', common.mustCall((received) => {
assert.deepStrictEqual(received, input);
port2.close(common.mustCall());
}));
}
{
const { port1, port2 } = new MessageChannel();
const input = { a: 1 };
port1.postMessage(input);
// Check that the message still gets delivered if `port2` has its
// `on('message')` handler attached at a later point in time.
setImmediate(() => {
port2.on('message', common.mustCall((received) => {
assert.deepStrictEqual(received, input);
port2.close(common.mustCall());
}));
});
}
{
const { port1, port2 } = new MessageChannel();
const input = { a: 1 };
const dummy = common.mustNotCall();
// Check that the message still gets delivered if `port2` has its
// `on('message')` handler attached at a later point in time, even if a
// listener was removed previously.
port2.addListener('message', dummy);
setImmediate(() => {
port2.removeListener('message', dummy);
port1.postMessage(input);
setImmediate(() => {
port2.on('message', common.mustCall((received) => {
assert.deepStrictEqual(received, input);
port2.close(common.mustCall());
}));
});
});
}

View File

@ -35,7 +35,9 @@ common.crashOnUnhandledRejection();
delete providers.HTTP2STREAM;
delete providers.HTTP2PING;
delete providers.HTTP2SETTINGS;
// TODO(addaleax): Test for these
delete providers.STREAMPIPE;
delete providers.MESSAGEPORT;
const objKeys = Object.keys(providers);
if (objKeys.length > 0)

View File

@ -117,7 +117,9 @@ const customTypesMap = {
'Tracing': 'tracing.html#tracing_tracing_object',
'URL': 'url.html#url_the_whatwg_url_api',
'URLSearchParams': 'url.html#url_class_urlsearchparams'
'URLSearchParams': 'url.html#url_class_urlsearchparams',
'MessagePort': 'worker.html#worker_class_messageport'
};
const arrayPart = /(?:\[])+$/;