worker: add cpuUsage for worker

PR-URL: https://github.com/nodejs/node/pull/59177
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Ilyas Shabi <ilyasshabi94@gmail.com>
This commit is contained in:
theanarkh 2025-07-28 18:42:44 +08:00 committed by GitHub
parent 0bbe7c36c9
commit 0ba6e0d7ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 225 additions and 1 deletions

View File

@ -1760,6 +1760,19 @@ added: v10.5.0
The `'online'` event is emitted when the worker thread has started executing
JavaScript code.
### `worker.cpuUsage([prev])`
<!-- YAML
added:
- REPLACEME
-->
* Returns: {Promise}
This method returns a `Promise` that will resolve to an object identical to [`process.threadCpuUsage()`][],
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
This methods allows the statistics to be observed from outside the actual thread.
### `worker.getHeapSnapshot([options])`
<!-- YAML
@ -2123,6 +2136,7 @@ thread spawned will spawn another until the application crashes.
[`process.stderr`]: process.md#processstderr
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`process.threadCpuUsage()`]: process.md#processthreadcpuusagepreviousvalue
[`process.title`]: process.md#processtitle
[`require('node:worker_threads').isMainThread`]: #workerismainthread
[`require('node:worker_threads').parentPort.on('message')`]: #event-message

View File

@ -8,6 +8,7 @@ const {
Float64Array,
FunctionPrototypeBind,
MathMax,
NumberMAX_SAFE_INTEGER,
ObjectEntries,
Promise,
PromiseResolve,
@ -41,6 +42,7 @@ const {
ERR_WORKER_INVALID_EXEC_ARGV,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_OPERATION_FAILED,
} = errorCodes;
const workerIo = require('internal/worker/io');
@ -61,7 +63,7 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
const { deserializeError } = require('internal/error_serdes');
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
const { validateArray, validateString } = require('internal/validators');
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
const {
throwIfBuildingSnapshot,
} = require('internal/v8/startup_snapshot');
@ -466,6 +468,37 @@ class Worker extends EventEmitter {
};
});
}
cpuUsage(prev) {
if (prev) {
validateObject(prev, 'prev');
validateNumber(prev.user, 'prev.user', 0, NumberMAX_SAFE_INTEGER);
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
}
if (process.platform === 'sunos') {
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
}
const taker = this[kHandle]?.cpuUsage();
return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (err, current) => {
if (err !== null) {
return reject(err);
}
if (prev) {
resolve({
user: current.user - prev.user,
system: current.system - prev.system,
});
} else {
resolve({
user: current.user,
system: current.system,
});
}
};
});
}
}
/**

View File

@ -79,6 +79,7 @@ namespace node {
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERCPUUSAGE) \
V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \

View File

@ -475,6 +475,7 @@
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)

View File

@ -32,6 +32,7 @@ using v8::Isolate;
using v8::Local;
using v8::Locker;
using v8::Maybe;
using v8::Name;
using v8::Null;
using v8::Number;
using v8::Object;
@ -810,6 +811,81 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
}
}
class WorkerCpuUsageTaker : public AsyncWrap {
public:
WorkerCpuUsageTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERCPUUSAGE) {}
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerCpuUsageTaker)
SET_SELF_SIZE(WorkerCpuUsageTaker)
};
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_cpu_usage_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}
BaseObjectPtr<WorkerCpuUsageTaker> taker =
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
int err = uv_getrusage_thread(cpu_usage_stats.get());
env->SetImmediateThreadsafe(
[taker = std::move(taker),
cpu_usage_stats = std::move(cpu_usage_stats),
err = err](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
Local<Value> argv[] = {
Null(isolate),
Undefined(isolate),
};
if (err) {
argv[0] = UVException(
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
} else {
Local<Name> names[] = {
FIXED_ONE_BYTE_STRING(isolate, "user"),
FIXED_ONE_BYTE_STRING(isolate, "system"),
};
Local<Value> values[] = {
Number::New(isolate,
1e6 * cpu_usage_stats->ru_utime.tv_sec +
cpu_usage_stats->ru_utime.tv_usec),
Number::New(isolate,
1e6 * cpu_usage_stats->ru_stime.tv_sec +
cpu_usage_stats->ru_stime.tv_usec),
};
argv[1] = Object::New(
isolate, Null(isolate), names, values, arraysize(names));
}
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});
if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}
class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
@ -1101,6 +1177,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);
SetConstructorFunction(isolate, target, "Worker", w);
}
@ -1133,6 +1210,19 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate());
}
{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
wst->InstanceTemplate()->SetInternalFieldCount(
WorkerCpuUsageTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerCpuUsageTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_cpu_usage_taker_template(wst->InstanceTemplate());
}
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
}
@ -1199,6 +1289,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::GetHeapStatistics);
registry->Register(Worker::CpuUsage);
}
} // anonymous namespace

View File

@ -80,6 +80,7 @@ class Worker : public AsyncWrap {
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetHeapStatistics(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void CpuUsage(const v8::FunctionCallbackInfo<v8::Value>& args);
private:
bool CreateEnvMessagePort(Environment* env);

View File

@ -0,0 +1,81 @@
'use strict';
const common = require('../common');
const { isSunOS } = require('../common');
const assert = require('assert');
const {
Worker,
} = require('worker_threads');
function validate(result) {
assert.ok(typeof result == 'object' && result !== null);
assert.ok(result.user >= 0);
assert.ok(result.system >= 0);
assert.ok(Number.isFinite(result.user));
assert.ok(Number.isFinite(result.system));
}
function check(worker) {
[
-1,
1.1,
NaN,
undefined,
{},
[],
null,
function() {},
Symbol(),
true,
Infinity,
{ user: -1, system: 1 },
{ user: 1, system: -1 },
].forEach((value) => {
try {
worker.cpuUsage(value);
} catch (e) {
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
}
});
}
const worker = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', () => {});
`, { eval: true });
// See test-process-threadCpuUsage-main-thread.js
if (isSunOS) {
assert.throws(
() => worker.cpuUsage(),
{
code: 'ERR_OPERATION_FAILED',
name: 'Error',
message: 'Operation failed: worker.cpuUsage() is not available on SunOS'
}
);
worker.terminate();
} else {
worker.on('online', common.mustCall(async () => {
check(worker);
const prev = await worker.cpuUsage();
validate(prev);
const curr = await worker.cpuUsage();
validate(curr);
assert.ok(curr.user >= prev.user);
assert.ok(curr.system >= prev.system);
const delta = await worker.cpuUsage(curr);
validate(delta);
worker.terminate();
}));
worker.once('exit', common.mustCall(async () => {
await assert.rejects(worker.cpuUsage(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));
}

View File

@ -62,6 +62,7 @@ const { getSystemErrorName } = require('util');
delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT;
delete providers.WORKERHEAPSTATISTICS;
delete providers.WORKERCPUUSAGE;
delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST;

View File

@ -16,6 +16,7 @@ declare namespace InternalWorkerBinding {
getResourceLimits(): Float64Array;
takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
cpuUsage(): Promise<object>;
loopIdleTime(): number;
loopStartTime(): number;
}