worker: optimize cpu profile implement

PR-URL: https://github.com/nodejs/node/pull/59683
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
theanarkh 2025-09-02 00:25:48 +08:00 committed by GitHub
parent fb22c7f414
commit 255dd7b62c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 119 additions and 100 deletions

View File

@ -1398,6 +1398,33 @@ setTimeout(() => {
}, 1000);
```
## Class: `CPUProfileHandle`
<!-- YAML
added: REPLACEME
-->
### `cpuProfileHandle.stop()`
<!-- YAML
added: REPLACEME
-->
* Returns: {Promise}
Stopping collecting the profile, then return a Promise that fulfills with an error or the
profile data.
### `cpuProfileHandle[Symbol.asyncDispose]()`
<!-- YAML
added: REPLACEME
-->
* Returns: {Promise}
Stopping collecting the profile and the profile will be discarded.
## `v8.isStringOneByteRepresentation(content)`
<!-- YAML

View File

@ -1958,19 +1958,16 @@ this matches its values.
If the worker has stopped, the return value is an empty object.
### `worker.startCpuProfile(name)`
### `worker.startCpuProfile()`
<!-- YAML
added: REPLACEME
-->
* name: {string}
* Returns: {Promise}
Starting a CPU profile with the given `name`, then return a Promise that fulfills
with an error or an object which has a `stop` method. Calling the `stop` method will
stop collecting the profile, then return a Promise that fulfills with an error or the
profile data.
Starting a CPU profile then return a Promise that fulfills with an error
or an `CPUProfileHandle` object. This API supports `await using` syntax.
```cjs
const { Worker } = require('node:worker_threads');
@ -1981,13 +1978,29 @@ const worker = new Worker(`
`, { eval: true });
worker.on('online', async () => {
const handle = await worker.startCpuProfile('demo');
const handle = await worker.startCpuProfile();
const profile = await handle.stop();
console.log(profile);
worker.terminate();
});
```
`await using` example.
```cjs
const { Worker } = require('node::worker_threads');
const w = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', () => {});
`, { eval: true });
w.on('online', async () => {
// Stop profile automatically when return and profile will be discarded
await using handle = await w.startCpuProfile();
});
```
### `worker.stderr`
<!-- YAML

View File

@ -134,6 +134,37 @@ function assignEnvironmentData(data) {
});
}
class CPUProfileHandle {
#worker = null;
#id = null;
#promise = null;
constructor(worker, id) {
this.#worker = worker;
this.#id = id;
}
stop() {
if (this.#promise) {
return this.#promise;
}
const stopTaker = this.#worker[kHandle]?.stopCpuProfile(this.#id);
return this.#promise = new Promise((resolve, reject) => {
if (!stopTaker) return reject(new ERR_WORKER_NOT_RUNNING());
stopTaker.ondone = (err, profile) => {
if (err) {
return reject(err);
}
resolve(profile);
};
});
};
async [SymbolAsyncDispose]() {
await this.stop();
}
}
class Worker extends EventEmitter {
constructor(filename, options = kEmptyObject) {
throwIfBuildingSnapshot('Creating workers');
@ -508,37 +539,15 @@ class Worker extends EventEmitter {
}
// TODO(theanarkh): add options, such as sample_interval, CpuProfilingMode
startCpuProfile(name) {
validateString(name, 'name');
const startTaker = this[kHandle]?.startCpuProfile(name);
startCpuProfile() {
const startTaker = this[kHandle]?.startCpuProfile();
return new Promise((resolve, reject) => {
if (!startTaker) return reject(new ERR_WORKER_NOT_RUNNING());
startTaker.ondone = (err) => {
startTaker.ondone = (err, id) => {
if (err) {
return reject(err);
}
let promise = null;
const stop = () => {
if (promise) {
return promise;
}
const stopTaker = this[kHandle]?.stopCpuProfile(name);
return promise = new Promise((resolve, reject) => {
if (!stopTaker) return reject(new ERR_WORKER_NOT_RUNNING());
stopTaker.ondone = (status, profile) => {
if (err) {
return reject(err);
}
resolve(profile);
};
});
};
resolve({
stop,
async [SymbolAsyncDispose]() {
await stop();
},
});
resolve(new CPUProfileHandle(this, id));
};
});
}

View File

@ -1064,7 +1064,7 @@ Environment::~Environment() {
delete external_memory_accounter_;
if (cpu_profiler_) {
for (auto& it : pending_profiles_) {
cpu_profiler_->Stop(it.second);
cpu_profiler_->Stop(it);
}
cpu_profiler_->Dispose();
cpu_profiler_ = nullptr;
@ -2233,30 +2233,30 @@ void Environment::RunWeakRefCleanup() {
isolate()->ClearKeptObjects();
}
v8::CpuProfilingResult Environment::StartCpuProfile(std::string_view name) {
v8::CpuProfilingResult Environment::StartCpuProfile() {
HandleScope handle_scope(isolate());
if (!cpu_profiler_) {
cpu_profiler_ = v8::CpuProfiler::New(isolate());
}
Local<Value> title =
node::ToV8Value(context(), name, isolate()).ToLocalChecked();
v8::CpuProfilingResult result =
cpu_profiler_->Start(title.As<String>(), true);
v8::CpuProfilingResult result = cpu_profiler_->Start(
v8::CpuProfilingOptions{v8::CpuProfilingMode::kLeafNodeLineNumbers,
v8::CpuProfilingOptions::kNoSampleLimit});
if (result.status == v8::CpuProfilingStatus::kStarted) {
pending_profiles_.emplace(name, result.id);
pending_profiles_.push_back(result.id);
}
return result;
}
v8::CpuProfile* Environment::StopCpuProfile(std::string_view name) {
v8::CpuProfile* Environment::StopCpuProfile(v8::ProfilerId profile_id) {
if (!cpu_profiler_) {
return nullptr;
}
auto it = pending_profiles_.find(std::string(name));
auto it =
std::find(pending_profiles_.begin(), pending_profiles_.end(), profile_id);
if (it == pending_profiles_.end()) {
return nullptr;
}
v8::CpuProfile* profile = cpu_profiler_->Stop(it->second);
v8::CpuProfile* profile = cpu_profiler_->Stop(*it);
pending_profiles_.erase(it);
return profile;
}

View File

@ -1049,8 +1049,8 @@ class Environment final : public MemoryRetainer {
inline void RemoveHeapSnapshotNearHeapLimitCallback(size_t heap_limit);
v8::CpuProfilingResult StartCpuProfile(std::string_view name);
v8::CpuProfile* StopCpuProfile(std::string_view name);
v8::CpuProfilingResult StartCpuProfile();
v8::CpuProfile* StopCpuProfile(v8::ProfilerId profile_id);
// Field identifiers for exit_info_
enum ExitInfoField {
@ -1250,7 +1250,7 @@ class Environment final : public MemoryRetainer {
released_allocated_buffers_;
v8::CpuProfiler* cpu_profiler_ = nullptr;
std::unordered_map<std::string, v8::ProfilerId> pending_profiles_;
std::vector<v8::ProfilerId> pending_profiles_;
};
} // namespace node

View File

@ -48,7 +48,6 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
V(ERR_CLOSED_MESSAGE_PORT, Error) \
V(ERR_CONSTRUCT_CALL_REQUIRED, TypeError) \
V(ERR_CONSTRUCT_CALL_INVALID, TypeError) \
V(ERR_CPU_PROFILE_ALREADY_STARTED, Error) \
V(ERR_CPU_PROFILE_NOT_STARTED, Error) \
V(ERR_CPU_PROFILE_TOO_MANY, Error) \
V(ERR_CRYPTO_CUSTOM_ENGINE_NOT_SUPPORTED, Error) \

View File

@ -915,9 +915,6 @@ void Worker::StartCpuProfile(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
CHECK(args[0]->IsString());
node::Utf8Value name(env->isolate(), args[0]);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_cpu_profile_taker_template()
@ -930,25 +927,23 @@ void Worker::StartCpuProfile(const FunctionCallbackInfo<Value>& args) {
MakeDetachedBaseObject<WorkerCpuProfileTaker>(env, wrap);
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
name = name.ToString(),
env](Environment* worker_env) mutable {
CpuProfilingResult result = worker_env->StartCpuProfile(name);
CpuProfilingResult result = worker_env->StartCpuProfile();
env->SetImmediateThreadsafe(
[taker = std::move(taker),
status = result.status](Environment* env) mutable {
[taker = std::move(taker), result = result](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
Local<Value> argv[] = {
Null(isolate), // error
Null(isolate), // error
Undefined(isolate), // profile id
};
if (status == CpuProfilingStatus::kAlreadyStarted) {
argv[0] = ERR_CPU_PROFILE_ALREADY_STARTED(
isolate, "CPU profile already started");
} else if (status == CpuProfilingStatus::kErrorTooManyProfilers) {
if (result.status == CpuProfilingStatus::kErrorTooManyProfilers) {
argv[0] = ERR_CPU_PROFILE_TOO_MANY(
isolate, "There are too many CPU profiles");
} else if (result.status == CpuProfilingStatus::kStarted) {
argv[1] = Number::New(isolate, result.id);
}
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
@ -965,8 +960,8 @@ void Worker::StopCpuProfile(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
CHECK(args[0]->IsString());
node::Utf8Value name(env->isolate(), args[0]);
CHECK(args[0]->IsUint32());
uint32_t profile_id = args[0]->Uint32Value(env->context()).FromJust();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
@ -980,11 +975,11 @@ void Worker::StopCpuProfile(const FunctionCallbackInfo<Value>& args) {
MakeDetachedBaseObject<WorkerCpuProfileTaker>(env, wrap);
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
name = name.ToString(),
profile_id = profile_id,
env](Environment* worker_env) mutable {
bool found = false;
auto json_out_stream = std::make_unique<node::JSONOutputStream>();
CpuProfile* profile = worker_env->StopCpuProfile(name);
CpuProfile* profile = worker_env->StopCpuProfile(profile_id);
if (profile) {
profile->Serialize(json_out_stream.get(),
CpuProfile::SerializationFormat::kJSON);

View File

@ -8,31 +8,9 @@ const worker = new Worker(`
parentPort.on('message', () => {});
`, { eval: true });
[
-1,
1.1,
NaN,
undefined,
{},
[],
null,
function() {},
Symbol(),
true,
Infinity,
].forEach((name) => {
try {
worker.startCpuProfile(name);
} catch (e) {
assert.ok(/ERR_INVALID_ARG_TYPE/i.test(e.code));
}
});
const name = 'demo';
worker.on('online', common.mustCall(async () => {
{
const handle = await worker.startCpuProfile(name);
const handle = await worker.startCpuProfile();
JSON.parse(await handle.stop());
// Stop again
JSON.parse(await handle.stop());
@ -40,8 +18,8 @@ worker.on('online', common.mustCall(async () => {
{
const [handle1, handle2] = await Promise.all([
worker.startCpuProfile('demo1'),
worker.startCpuProfile('demo2'),
worker.startCpuProfile(),
worker.startCpuProfile(),
]);
const [profile1, profile2] = await Promise.all([
handle1.stop(),
@ -52,22 +30,14 @@ worker.on('online', common.mustCall(async () => {
}
{
// Calling startCpuProfile twice with same name will throw an error
await worker.startCpuProfile(name);
try {
await worker.startCpuProfile(name);
} catch (e) {
assert.ok(/ERR_CPU_PROFILE_ALREADY_STARTED/i.test(e.code));
}
// Does not need to stop the profile because it will be stopped
// automatically when the worker is terminated
await worker.startCpuProfile();
// It will be stopped automatically when the worker is terminated
}
worker.terminate();
}));
worker.once('exit', common.mustCall(async () => {
await assert.rejects(worker.startCpuProfile(name), {
await assert.rejects(worker.startCpuProfile(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));

View File

@ -344,6 +344,7 @@ const customTypesMap = {
'Lock': 'worker_threads.html#class-lock',
'LockManager': 'worker_threads.html#class-lockmanager',
'LockManagerSnapshot': 'https://developer.mozilla.org/en-US/docs/Web/API/LockManagerSnapshot',
'CPUProfileHandle': 'v8.html#class-cpuprofilehandle',
};
const arrayPart = /(?:\[])+$/;

View File

@ -17,12 +17,17 @@ declare namespace InternalWorkerBinding {
takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
cpuUsage(): Promise<object>;
startCpuProfile(name): Promise<object>;
startCpuProfile(): Promise<CPUProfileHandle>;
loopIdleTime(): number;
loopStartTime(): number;
}
}
export interface CPUProfileHandle {
stop(): Promise<string>;
[Symbol.asyncDispose](): Promise<void>;
}
export interface WorkerBinding {
Worker: typeof InternalWorkerBinding.Worker;
getEnvMessagePort(): InternalMessagingBinding.MessagePort;