Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,16 @@ valid.
The imported module string is an invalid URL, package name, or package subpath
specifier.

<a id="ERR_INVALID_NOTIFICATION"></a>

### `ERR_INVALID_NOTIFICATION`

<!-- YAML
added: REPLACEME
-->

The notification ID is not valid.

<a id="ERR_INVALID_OBJECT_DEFINE_PROPERTY"></a>

### `ERR_INVALID_OBJECT_DEFINE_PROPERTY`
Expand Down
61 changes: 61 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,19 @@ The references returned by `process.getBuiltinModule(id)` always point to
the built-in module corresponding to `id` even if users modify
[`require.cache`][] so that `require(id)` returns something else.

## `process.getNotifications()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {Array<bigint>} A list of notifications registered in the current thread.

Return the list of all notifications registered in the current thread via
[`process.registerNotification(callback)`](#processregisternotificationcallback).

## `process.getegid()`

<!-- YAML
Expand Down Expand Up @@ -3100,6 +3113,21 @@ the [`'warning'` event][process_warning] and the
[`emitWarning()` method][process_emit_warning] for more information about this
flag's behavior.

## `process.notify(id)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `id` {bigint}: The notification ID

Triggers the notification previously registered via [`process.registerNotification(callback)`](#processregisternotificationcallback).

It will throw an error if the notification is not found. It is safe to invoke a notification
registered in the same thread.

## `process.permission`

<!-- YAML
Expand Down Expand Up @@ -3253,6 +3281,24 @@ This pattern, however, is being deprecated in favor of the "Refable protocol"
in order to better support Web Platform API types whose APIs cannot be modified
to add `ref()` and `unref()` methods but still need to support that behavior.

## `process.registerNotification(callback)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `callback` {Function} A function to execute when the notification is received.
* Returns: {bigint} The notification ID

Register a callback associated to a notification.

A notification is a number which can be safely transferred and used in other threads to trigger
the associated callback via [`process.notify(id`)](#processnotifyid).

The notification can be unregistered using [`process.unregisterNotification(id)`](#processunregisternotificationid).

## `process.release`

<!-- YAML
Expand Down Expand Up @@ -4340,6 +4386,21 @@ This pattern, however, is being deprecated in favor of the "Refable protocol"
in order to better support Web Platform API types whose APIs cannot be modified
to add `ref()` and `unref()` methods but still need to support that behavior.

## `process.unregisterNotification(id)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `id` {number}

Unregister a notification previously registered with [`process.registerNotification(callback)`](#processregisternotificationcallback).

It will throw an error if the notification is not found. The notification must be unregistered from the same thread
that registered it.

## `process.uptime()`

<!-- YAML
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ const rawMethods = internalBinding('process_methods');
process.execve = wrapped.execve;
process.ref = perThreadSetup.ref;
process.unref = perThreadSetup.unref;
process.getNotifications = wrapped.getNotifications;
process.registerNotification = wrapped.registerNotification;
process.unregisterNotification = wrapped.unregisterNotification;
process.notify = wrapped.notify;

let finalizationMod;
ObjectDefineProperty(process, 'finalization', {
Expand Down
44 changes: 44 additions & 0 deletions lib/internal/process/per_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ const {
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypeSplice,
BigInt,
BigUint64Array,
Float64Array,
FunctionPrototypeCall,
NumberIsInteger,
NumberMAX_SAFE_INTEGER,
ObjectDefineProperty,
ObjectEntries,
Expand Down Expand Up @@ -49,6 +51,7 @@ const { emitExperimentalWarning } = require('internal/util');
const format = require('internal/util/inspect').format;
const {
validateArray,
validateFunction,
validateNumber,
validateObject,
validateString,
Expand Down Expand Up @@ -113,6 +116,10 @@ function wrapProcessMethods(binding) {
resourceUsage: _resourceUsage,
loadEnvFile: _loadEnvFile,
execve: _execve,
getNotifications,
registerNotification: _registerNotification,
sendNotification,
unregisterNotification: _unregisterNotification,
} = binding;

function _rawDebug(...args) {
Expand Down Expand Up @@ -365,6 +372,39 @@ function wrapProcessMethods(binding) {
}
}

function notify(id) {
if (typeof id !== 'number' && typeof id !== 'bigint') {
throw new ERR_INVALID_ARG_TYPE('id', ['number', 'bigint'], id);
}

if (typeof id === 'number' && !NumberIsInteger(id)) {
throw new ERR_OUT_OF_RANGE('id', 'an integer', id);
} else if (id < 1) {
throw new ERR_OUT_OF_RANGE('id', `>= 1`, id);
}

sendNotification(BigInt(id));
}

function registerNotification(listener) {
validateFunction(listener, 'listener');
return _registerNotification(listener);
}

function unregisterNotification(id) {
if (typeof id !== 'number' && typeof id !== 'bigint') {
throw new ERR_INVALID_ARG_TYPE('id', ['number', 'bigint'], id);
}

if (typeof id === 'number' && !NumberIsInteger(id)) {
throw new ERR_OUT_OF_RANGE('id', 'an integer', id);
} else if (id < 1) {
throw new ERR_OUT_OF_RANGE('id', `>= 1`, id);
}

_unregisterNotification(BigInt(id));
}

return {
_rawDebug,
cpuUsage,
Expand All @@ -375,6 +415,10 @@ function wrapProcessMethods(binding) {
exit,
execve,
loadEnvFile,
getNotifications,
notify,
registerNotification,
unregisterNotification,
};
}

Expand Down
104 changes: 104 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,9 +1103,19 @@ void Environment::InitializeLibuv() {
Context::Scope context_scope(env->context());
env->RunAndClearNativeImmediates();
}));

CHECK_EQ(
0,
uv_async_init(event_loop(), &notifications_async_, [](uv_async_t* async) {
Environment* env =
ContainerOf(&Environment::notifications_async_, async);
env->DispatchNotifications();
}));

uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
uv_unref(reinterpret_cast<uv_handle_t*>(&notifications_async_));

{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
Expand Down Expand Up @@ -1215,6 +1225,7 @@ void Environment::ClosePerEnvHandles() {
close_and_finish(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
close_and_finish(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
close_and_finish(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
close_and_finish(reinterpret_cast<uv_handle_t*>(&notifications_async_));
}

void Environment::CleanupHandles() {
Expand Down Expand Up @@ -2260,4 +2271,97 @@ v8::CpuProfile* Environment::StopCpuProfile(v8::ProfilerId profile_id) {
return profile;
}

Mutex Environment::notifications_mutex_;
uint64_t Environment::next_notification_id_(0);
std::unordered_map<uint64_t, Environment*> Environment::notifications_;

void Environment::RegisterNotification(
const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args[0]->IsFunction());

Mutex::ScopedLock lock(notifications_mutex_);
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();

uint64_t id = ++Environment::next_notification_id_;
notifications_.emplace(id, env);

v8::Global<v8::Function> global;
global.Reset(isolate, args[0].As<v8::Function>());
env->notifications_callbacks_.emplace(id, std::move(global));

args.GetReturnValue().Set(v8::BigInt::New(isolate, id));
}

void Environment::UnregisterNotification(
const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args[0]->IsBigInt());

bool lossless = false;
uint64_t id = args[0].As<v8::BigInt>()->Uint64Value(&lossless);

if (!lossless) {
std::cout << "LOSSLESS TRUE\n";
return THROW_ERR_INVALID_NOTIFICATION(
args.GetIsolate(), "Invalid notification %un", id);
}

Mutex::ScopedLock lock(notifications_mutex_);
Environment* env = Environment::GetCurrent(args);

if (env->notifications_callbacks_.erase(id) == 0) {
return THROW_ERR_INVALID_NOTIFICATION(
args.GetIsolate(), "Invalid notification %llun", id);
}

env->notifications_queue_.erase(id);
notifications_.erase(id);
}

void Environment::GetNotifications(
const v8::FunctionCallbackInfo<v8::Value>& args) {
Mutex::ScopedLock lock(notifications_mutex_);
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();

v8::LocalVector<Value> ids(isolate);
for (auto& pairs : env->notifications_callbacks_) {
ids.push_back(v8::BigInt::New(isolate, pairs.first));
}

args.GetReturnValue().Set(Array::New(isolate, ids.data(), ids.size()));
}

void Environment::SendNotification(
const v8::FunctionCallbackInfo<v8::Value>& args) {
CHECK(args[0]->IsBigInt());

uint64_t id = args[0].As<v8::BigInt>()->Uint64Value();
Environment* env = notifications_[id];

if (env == nullptr) {
return THROW_ERR_INVALID_NOTIFICATION(
args.GetIsolate(), "Invalid notification %llun", id);
}

Mutex::ScopedLock lock(notifications_mutex_);
env->notifications_queue_.insert(id);
uv_async_send(&env->notifications_async_);
}

void Environment::DispatchNotifications() {
Isolate* isolate = this->isolate();
v8::Local<v8::Object> process = process_object();
Mutex::ScopedLock lock(notifications_mutex_);
HandleScope handle_scope(isolate);

for (auto id : notifications_queue_) {
v8::Local<v8::Function> callback =
notifications_callbacks_[id].Get(isolate);
MakeCallback(isolate, process, callback, 0, nullptr, {0, 0})
.ToLocalChecked();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using ToLocalChecked if possible, if the callbacks throw it will crash the process. You'll want to make sure the error gets appropriately propagated.

Copy link
Contributor Author

@ShogunPanda ShogunPanda Sep 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed that. But I see no difference in how the error is propagated, I still get:

node:internal/event_target:1101
  process.nextTick(() => { throw err; });
                           ^
Error: kaboom
    at process.<anonymous> (/Volumes/DATI/Users/Shogun/Programmazione/OSS/nodejs/test/parallel/test-process-notifications-error.js:22:11)

Node.js v25.0.0-pre

Am I missing anything?

}

notifications_queue_.clear();
}
} // namespace node
17 changes: 17 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,15 @@ class Environment final : public MemoryRetainer {

v8::Global<v8::Module> temporary_required_module_facade_original;

void DispatchNotifications();

static void RegisterNotification(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void UnregisterNotification(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetNotifications(const v8::FunctionCallbackInfo<v8::Value>& args);
static void SendNotification(const v8::FunctionCallbackInfo<v8::Value>& args);

private:
inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>,
v8::Local<v8::Value>),
Expand All @@ -1087,6 +1096,7 @@ class Environment final : public MemoryRetainer {
uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_;
uv_async_t task_queues_async_;
uv_async_t notifications_async_;
int64_t task_queues_async_refs_ = 0;

// These may be read by ctors and should be listed before complex fields.
Expand Down Expand Up @@ -1246,6 +1256,13 @@ class Environment final : public MemoryRetainer {

v8::CpuProfiler* cpu_profiler_ = nullptr;
std::vector<v8::ProfilerId> pending_profiles_;

static Mutex notifications_mutex_;
static uint64_t next_notification_id_;
static std::unordered_map<uint64_t, Environment*> notifications_;
std::unordered_map<uint64_t, v8::Global<v8::Function>>
notifications_callbacks_;
std::set<uint64_t> notifications_queue_;
};

} // namespace node
Expand Down
2 changes: 2 additions & 0 deletions src/node_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
V(ERR_INVALID_PACKAGE_CONFIG, Error) \
V(ERR_INVALID_OBJECT_DEFINE_PROPERTY, TypeError) \
V(ERR_INVALID_MODULE, Error) \
V(ERR_INVALID_NOTIFICATION, Error) \
V(ERR_INVALID_STATE, Error) \
V(ERR_INVALID_THIS, TypeError) \
V(ERR_INVALID_URL, TypeError) \
Expand Down Expand Up @@ -217,6 +218,7 @@ ERRORS_WITH_CODE(V)
V(ERR_INVALID_ADDRESS, "Invalid socket address") \
V(ERR_INVALID_INVOCATION, "Invalid invocation") \
V(ERR_INVALID_MODULE, "No such module") \
V(ERR_INVALID_NOTIFICATION, "Invalid notification") \
V(ERR_INVALID_STATE, "Invalid state") \
V(ERR_INVALID_THIS, "Value of \"this\" is the wrong type") \
V(ERR_INVALID_URL_SCHEME, "The URL must be of scheme file:") \
Expand Down
Loading
Loading