mirror of
https://github.com/zebrajr/node.git
synced 2025-12-06 12:20:27 +01:00
When iterating over diagnostics channel subscribers, assume their count is zero if the list of subscribers becomes undefined, because there may be only one subscriber which may unsubscribe itself as part of its onMessage handler. Signed-off-by: Gabriel Schulhof <gabrielschulhof@gmail.com> PR-URL: https://github.com/nodejs/node/pull/48933 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Chengzhong Wu <legendecas@gmail.com> Reviewed-By: theanarkh <theratliter@gmail.com>
415 lines
9.6 KiB
JavaScript
415 lines
9.6 KiB
JavaScript
'use strict';
|
|
|
|
const {
|
|
ArrayPrototypeAt,
|
|
ArrayPrototypeIndexOf,
|
|
ArrayPrototypePush,
|
|
ArrayPrototypeSplice,
|
|
SafeFinalizationRegistry,
|
|
ObjectGetPrototypeOf,
|
|
ObjectSetPrototypeOf,
|
|
Promise,
|
|
PromisePrototypeThen,
|
|
PromiseResolve,
|
|
PromiseReject,
|
|
ReflectApply,
|
|
SafeMap,
|
|
SymbolHasInstance,
|
|
} = primordials;
|
|
|
|
const {
|
|
codes: {
|
|
ERR_INVALID_ARG_TYPE,
|
|
},
|
|
} = require('internal/errors');
|
|
const {
|
|
validateFunction,
|
|
} = require('internal/validators');
|
|
|
|
const { triggerUncaughtException } = internalBinding('errors');
|
|
|
|
const { WeakReference } = internalBinding('util');
|
|
|
|
// Can't delete when weakref count reaches 0 as it could increment again.
|
|
// Only GC can be used as a valid time to clean up the channels map.
|
|
class WeakRefMap extends SafeMap {
|
|
#finalizers = new SafeFinalizationRegistry((key) => {
|
|
this.delete(key);
|
|
});
|
|
|
|
set(key, value) {
|
|
this.#finalizers.register(value, key);
|
|
return super.set(key, new WeakReference(value));
|
|
}
|
|
|
|
get(key) {
|
|
return super.get(key)?.get();
|
|
}
|
|
|
|
incRef(key) {
|
|
return super.get(key)?.incRef();
|
|
}
|
|
|
|
decRef(key) {
|
|
return super.get(key)?.decRef();
|
|
}
|
|
}
|
|
|
|
function markActive(channel) {
|
|
// eslint-disable-next-line no-use-before-define
|
|
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
|
|
channel._subscribers = [];
|
|
channel._stores = new SafeMap();
|
|
}
|
|
|
|
function maybeMarkInactive(channel) {
|
|
// When there are no more active subscribers or bound, restore to fast prototype.
|
|
if (!channel._subscribers.length && !channel._stores.size) {
|
|
// eslint-disable-next-line no-use-before-define
|
|
ObjectSetPrototypeOf(channel, Channel.prototype);
|
|
channel._subscribers = undefined;
|
|
channel._stores = undefined;
|
|
}
|
|
}
|
|
|
|
function defaultTransform(data) {
|
|
return data;
|
|
}
|
|
|
|
function wrapStoreRun(store, data, next, transform = defaultTransform) {
|
|
return () => {
|
|
let context;
|
|
try {
|
|
context = transform(data);
|
|
} catch (err) {
|
|
process.nextTick(() => {
|
|
triggerUncaughtException(err, false);
|
|
});
|
|
return next();
|
|
}
|
|
|
|
return store.run(context, next);
|
|
};
|
|
}
|
|
|
|
// TODO(qard): should there be a C++ channel interface?
|
|
class ActiveChannel {
|
|
subscribe(subscription) {
|
|
validateFunction(subscription, 'subscription');
|
|
ArrayPrototypePush(this._subscribers, subscription);
|
|
channels.incRef(this.name);
|
|
}
|
|
|
|
unsubscribe(subscription) {
|
|
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
|
|
if (index === -1) return false;
|
|
|
|
ArrayPrototypeSplice(this._subscribers, index, 1);
|
|
|
|
channels.decRef(this.name);
|
|
maybeMarkInactive(this);
|
|
|
|
return true;
|
|
}
|
|
|
|
bindStore(store, transform) {
|
|
const replacing = this._stores.has(store);
|
|
if (!replacing) channels.incRef(this.name);
|
|
this._stores.set(store, transform);
|
|
}
|
|
|
|
unbindStore(store) {
|
|
if (!this._stores.has(store)) {
|
|
return false;
|
|
}
|
|
|
|
this._stores.delete(store);
|
|
|
|
channels.decRef(this.name);
|
|
maybeMarkInactive(this);
|
|
|
|
return true;
|
|
}
|
|
|
|
get hasSubscribers() {
|
|
return true;
|
|
}
|
|
|
|
publish(data) {
|
|
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
|
|
try {
|
|
const onMessage = this._subscribers[i];
|
|
onMessage(data, this.name);
|
|
} catch (err) {
|
|
process.nextTick(() => {
|
|
triggerUncaughtException(err, false);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
runStores(data, fn, thisArg, ...args) {
|
|
let run = () => {
|
|
this.publish(data);
|
|
return ReflectApply(fn, thisArg, args);
|
|
};
|
|
|
|
for (const entry of this._stores.entries()) {
|
|
const store = entry[0];
|
|
const transform = entry[1];
|
|
run = wrapStoreRun(store, data, run, transform);
|
|
}
|
|
|
|
return run();
|
|
}
|
|
}
|
|
|
|
class Channel {
|
|
constructor(name) {
|
|
this._subscribers = undefined;
|
|
this._stores = undefined;
|
|
this.name = name;
|
|
|
|
channels.set(name, this);
|
|
}
|
|
|
|
static [SymbolHasInstance](instance) {
|
|
const prototype = ObjectGetPrototypeOf(instance);
|
|
return prototype === Channel.prototype ||
|
|
prototype === ActiveChannel.prototype;
|
|
}
|
|
|
|
subscribe(subscription) {
|
|
markActive(this);
|
|
this.subscribe(subscription);
|
|
}
|
|
|
|
unsubscribe() {
|
|
return false;
|
|
}
|
|
|
|
bindStore(store, transform) {
|
|
markActive(this);
|
|
this.bindStore(store, transform);
|
|
}
|
|
|
|
unbindStore() {
|
|
return false;
|
|
}
|
|
|
|
get hasSubscribers() {
|
|
return false;
|
|
}
|
|
|
|
publish() {}
|
|
|
|
runStores(data, fn, thisArg, ...args) {
|
|
return ReflectApply(fn, thisArg, args);
|
|
}
|
|
}
|
|
|
|
const channels = new WeakRefMap();
|
|
|
|
function channel(name) {
|
|
const channel = channels.get(name);
|
|
if (channel) return channel;
|
|
|
|
if (typeof name !== 'string' && typeof name !== 'symbol') {
|
|
throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
|
|
}
|
|
|
|
return new Channel(name);
|
|
}
|
|
|
|
function subscribe(name, subscription) {
|
|
return channel(name).subscribe(subscription);
|
|
}
|
|
|
|
function unsubscribe(name, subscription) {
|
|
return channel(name).unsubscribe(subscription);
|
|
}
|
|
|
|
function hasSubscribers(name) {
|
|
const channel = channels.get(name);
|
|
if (!channel) return false;
|
|
|
|
return channel.hasSubscribers;
|
|
}
|
|
|
|
const traceEvents = [
|
|
'start',
|
|
'end',
|
|
'asyncStart',
|
|
'asyncEnd',
|
|
'error',
|
|
];
|
|
|
|
function assertChannel(value, name) {
|
|
if (!(value instanceof Channel)) {
|
|
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
|
|
}
|
|
}
|
|
|
|
class TracingChannel {
|
|
constructor(nameOrChannels) {
|
|
if (typeof nameOrChannels === 'string') {
|
|
this.start = channel(`tracing:${nameOrChannels}:start`);
|
|
this.end = channel(`tracing:${nameOrChannels}:end`);
|
|
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
|
|
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
|
|
this.error = channel(`tracing:${nameOrChannels}:error`);
|
|
} else if (typeof nameOrChannels === 'object') {
|
|
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
|
|
|
|
assertChannel(start, 'nameOrChannels.start');
|
|
assertChannel(end, 'nameOrChannels.end');
|
|
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
|
|
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
|
|
assertChannel(error, 'nameOrChannels.error');
|
|
|
|
this.start = start;
|
|
this.end = end;
|
|
this.asyncStart = asyncStart;
|
|
this.asyncEnd = asyncEnd;
|
|
this.error = error;
|
|
} else {
|
|
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
|
|
['string', 'object', 'Channel'],
|
|
nameOrChannels);
|
|
}
|
|
}
|
|
|
|
subscribe(handlers) {
|
|
for (const name of traceEvents) {
|
|
if (!handlers[name]) continue;
|
|
|
|
this[name]?.subscribe(handlers[name]);
|
|
}
|
|
}
|
|
|
|
unsubscribe(handlers) {
|
|
let done = true;
|
|
|
|
for (const name of traceEvents) {
|
|
if (!handlers[name]) continue;
|
|
|
|
if (!this[name]?.unsubscribe(handlers[name])) {
|
|
done = false;
|
|
}
|
|
}
|
|
|
|
return done;
|
|
}
|
|
|
|
traceSync(fn, context = {}, thisArg, ...args) {
|
|
const { start, end, error } = this;
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
const result = ReflectApply(fn, thisArg, args);
|
|
context.result = result;
|
|
return result;
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
tracePromise(fn, context = {}, thisArg, ...args) {
|
|
const { start, end, asyncStart, asyncEnd, error } = this;
|
|
|
|
function reject(err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
asyncStart.publish(context);
|
|
// TODO: Is there a way to have asyncEnd _after_ the continuation?
|
|
asyncEnd.publish(context);
|
|
return PromiseReject(err);
|
|
}
|
|
|
|
function resolve(result) {
|
|
context.result = result;
|
|
asyncStart.publish(context);
|
|
// TODO: Is there a way to have asyncEnd _after_ the continuation?
|
|
asyncEnd.publish(context);
|
|
return result;
|
|
}
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
let promise = ReflectApply(fn, thisArg, args);
|
|
// Convert thenables to native promises
|
|
if (!(promise instanceof Promise)) {
|
|
promise = PromiseResolve(promise);
|
|
}
|
|
return PromisePrototypeThen(promise, resolve, reject);
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
|
|
const { start, end, asyncStart, asyncEnd, error } = this;
|
|
|
|
function wrappedCallback(err, res) {
|
|
if (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
} else {
|
|
context.result = res;
|
|
}
|
|
|
|
// Using runStores here enables manual context failure recovery
|
|
asyncStart.runStores(context, () => {
|
|
try {
|
|
if (callback) {
|
|
return ReflectApply(callback, this, arguments);
|
|
}
|
|
} finally {
|
|
asyncEnd.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
const callback = ArrayPrototypeAt(args, position);
|
|
if (typeof callback !== 'function') {
|
|
throw new ERR_INVALID_ARG_TYPE('callback', ['function'], callback);
|
|
}
|
|
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
return ReflectApply(fn, thisArg, args);
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function tracingChannel(nameOrChannels) {
|
|
return new TracingChannel(nameOrChannels);
|
|
}
|
|
|
|
module.exports = {
|
|
channel,
|
|
hasSubscribers,
|
|
subscribe,
|
|
tracingChannel,
|
|
unsubscribe,
|
|
Channel,
|
|
};
|