Skip to content

Commit 3019ab6

Browse files
committed
worker: add Notification API
1 parent 5c38edf commit 3019ab6

File tree

10 files changed

+405
-1
lines changed

10 files changed

+405
-1
lines changed

doc/api/worker_threads.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,34 @@ this matches its values.
654654
655655
If this is used in the main thread, its value is an empty object.
656656
657+
## `worker.getNotifications()`
658+
659+
<!-- YAML
660+
added: REPLACEME
661+
-->
662+
663+
* Returns: {Array<bigint>} A list of notifications registered in the current thread.
664+
665+
Return the list of all notifications registered in the current thread via
666+
[`worker.registerNotification(callback)`](#workerregisternotificationcallback).
667+
668+
## `worker.registerNotification(callback)`
669+
670+
<!-- YAML
671+
added: REPLACEME
672+
-->
673+
674+
* `callback` {Function} A function to execute when the notification is received.
675+
* Returns: {bigint} The notification ID
676+
677+
Register a callback associated to a notification.
678+
679+
A notification is a number which can be safely transferred and used in other threads to trigger
680+
the associated callback via [`sendNotification(id`)](#workersendnotificationid).
681+
682+
The notification must be unregistered using [`unregisterNotification(id)`](#workerunregisternotificationid)
683+
or [`unregisterNotifications()`](#workerunregisternotifications).
684+
657685
## `worker.SHARE_ENV`
658686
659687
<!-- YAML
@@ -685,6 +713,17 @@ new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
685713
});
686714
```
687715
716+
## `worker.sendNotification(id)`
717+
718+
* `id` {bigint}: The notification ID
719+
720+
Triggers the notification previously registered via [`worker.registerNotification(callback)`](#workerregisternotificationcallback).
721+
722+
The callback in receiving thread will invoked via [`setImmediate`](./timers.md#setimmediatecallback-args).
723+
724+
It will throw an error if the notification is not found. It is safe to invoke a notification
725+
registered in the same thread.
726+
688727
## `worker.setEnvironmentData(key[, value])`
689728
690729
<!-- YAML
@@ -732,6 +771,27 @@ added: v24.6.0
732771
A string identifier for the current thread or null if the thread is not running.
733772
On the corresponding worker object (if there is any), it is available as [`worker.threadName`][].
734773
774+
## `worker.unregisterNotification(id)`
775+
776+
<!-- YAML
777+
added: REPLACEME
778+
-->
779+
780+
* `id` {number}
781+
782+
Unregister a notification previously registered with [`worker.registerNotification(callback)`](#workerregisternotificationcallback).
783+
784+
It will throw an error if the notification is not found. The notification must be unregistered from the same
785+
worker thread that registered it.
786+
787+
## `worker.unregisterNotifications()`
788+
789+
<!-- YAML
790+
added: REPLACEME
791+
-->
792+
793+
Unregister all the notifications previously registered with [`worker.registerNotification(callback)`](#workerregisternotificationcallback).
794+
735795
## `worker.workerData`
736796
737797
<!-- YAML

lib/internal/worker.js

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const {
55
ArrayPrototypeMap,
66
ArrayPrototypePush,
77
AtomicsAdd,
8+
BigInt,
89
Float64Array,
910
FunctionPrototypeBind,
1011
MathMax,
@@ -43,6 +44,7 @@ const {
4344
ERR_INVALID_ARG_TYPE,
4445
ERR_INVALID_ARG_VALUE,
4546
ERR_OPERATION_FAILED,
47+
ERR_OUT_OF_RANGE,
4648
} = errorCodes;
4749

4850
const workerIo = require('internal/worker/io');
@@ -63,7 +65,13 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
6365
const { deserializeError } = require('internal/error_serdes');
6466
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6567
const { kEmptyObject } = require('internal/util');
66-
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
68+
const {
69+
validateArray,
70+
validateString,
71+
validateObject,
72+
validateNumber,
73+
validateFunction,
74+
} = require('internal/validators');
6775
const {
6876
throwIfBuildingSnapshot,
6977
} = require('internal/v8/startup_snapshot');
@@ -80,6 +88,11 @@ const {
8088
kCodeRangeSizeMb,
8189
kStackSizeMb,
8290
kTotalResourceLimitCount,
91+
getNotifications,
92+
registerNotification: registerNotificationInternal,
93+
sendNotification: sendNotificationInternal,
94+
unregisterNotification: unregisterNotificationInternal,
95+
unregisterNotifications,
8396
} = internalBinding('worker');
8497

8598
const kHandle = Symbol('kHandle');
@@ -620,6 +633,37 @@ function eventLoopUtilization(util1, util2) {
620633
);
621634
}
622635

636+
637+
function registerNotification(listener) {
638+
validateFunction(listener, 'listener');
639+
return registerNotificationInternal(listener);
640+
}
641+
642+
function unregisterNotification(id) {
643+
if (typeof id !== 'number' && typeof id !== 'bigint') {
644+
throw new ERR_INVALID_ARG_TYPE('id', ['number', 'bigint'], id);
645+
}
646+
647+
if (id < 0) {
648+
throw new ERR_OUT_OF_RANGE('id', `>= 1`, id);
649+
}
650+
651+
unregisterNotificationInternal(BigInt(id));
652+
}
653+
654+
function sendNotification(id) {
655+
if (typeof id !== 'number' && typeof id !== 'bigint') {
656+
throw new ERR_INVALID_ARG_TYPE('id', ['number', 'bigint'], id);
657+
}
658+
659+
if (id < 0) {
660+
throw new ERR_OUT_OF_RANGE('id', `>= 1`, id);
661+
}
662+
663+
sendNotificationInternal(BigInt(id));
664+
}
665+
666+
623667
module.exports = {
624668
ownsProcessState,
625669
kIsOnline,
@@ -635,4 +679,9 @@ module.exports = {
635679
threadName,
636680
InternalWorker,
637681
Worker,
682+
getNotifications,
683+
registerNotification,
684+
sendNotification,
685+
unregisterNotification,
686+
unregisterNotifications,
638687
};

lib/worker_threads.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ const {
1010
threadId,
1111
threadName,
1212
Worker,
13+
getNotifications,
14+
registerNotification,
15+
sendNotification,
16+
unregisterNotification,
17+
unregisterNotifications,
1318
} = require('internal/worker');
1419

1520
const {
@@ -54,4 +59,9 @@ module.exports = {
5459
setEnvironmentData,
5560
getEnvironmentData,
5661
locks,
62+
getNotifications,
63+
registerNotification,
64+
sendNotification,
65+
unregisterNotification,
66+
unregisterNotifications,
5767
};

src/env.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class NODE_EXTERN_PRIVATE IsolateData : public MemoryRetainer {
199199

200200
size_t max_young_gen_size = 1;
201201
std::unordered_map<const char*, v8::Eternal<v8::String>> static_str_map;
202+
std::set<uint64_t> worker_notifications_;
202203

203204
inline v8::Isolate* isolate() const;
204205
IsolateData(const IsolateData&) = delete;

src/node_errors.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
9999
V(ERR_INVALID_PACKAGE_CONFIG, Error) \
100100
V(ERR_INVALID_OBJECT_DEFINE_PROPERTY, TypeError) \
101101
V(ERR_INVALID_MODULE, Error) \
102+
V(ERR_INVALID_NOTIFICATION, Error) \
102103
V(ERR_INVALID_STATE, Error) \
103104
V(ERR_INVALID_THIS, TypeError) \
104105
V(ERR_INVALID_URL, TypeError) \
@@ -218,6 +219,7 @@ ERRORS_WITH_CODE(V)
218219
V(ERR_INVALID_ADDRESS, "Invalid socket address") \
219220
V(ERR_INVALID_INVOCATION, "Invalid invocation") \
220221
V(ERR_INVALID_MODULE, "No such module") \
222+
V(ERR_INVALID_NOTIFICATION, "Invalid notification") \
221223
V(ERR_INVALID_STATE, "Invalid state") \
222224
V(ERR_INVALID_THIS, "Value of \"this\" is the wrong type") \
223225
V(ERR_INVALID_URL_SCHEME, "The URL must be of scheme file:") \

src/node_worker.cc

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ using v8::HandleScope;
3434
using v8::Integer;
3535
using v8::Isolate;
3636
using v8::Local;
37+
using v8::LocalVector;
3738
using v8::Locker;
3839
using v8::Maybe;
3940
using v8::Name;
@@ -221,6 +222,8 @@ class WorkerThreadData {
221222
}
222223

223224
~WorkerThreadData() {
225+
Worker::UnregisterAllNotifications(isolate_data_.get());
226+
224227
Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
225228
Isolate* isolate;
226229
{
@@ -1281,6 +1284,127 @@ void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
12811284
args.GetReturnValue().Set(loop_start_time / 1e6);
12821285
}
12831286

1287+
void Worker::GetNotifications(const FunctionCallbackInfo<Value>& args) {
1288+
Environment* env = Environment::GetCurrent(args);
1289+
Isolate* isolate = env->isolate();
1290+
LocalVector<Value> notifications(isolate);
1291+
1292+
for (auto notification_id : env->isolate_data()->worker_notifications_) {
1293+
notifications.push_back(v8::BigInt::New(isolate, notification_id));
1294+
}
1295+
1296+
args.GetReturnValue().Set(
1297+
Array::New(isolate, notifications.data(), notifications.size()));
1298+
}
1299+
1300+
void Worker::RegisterNotification(const FunctionCallbackInfo<Value>& args) {
1301+
CHECK(args[0]->IsFunction());
1302+
1303+
Environment* env = Environment::GetCurrent(args);
1304+
Isolate* isolate = env->isolate();
1305+
NotificationData* notification =
1306+
new NotificationData(env->thread_id(), env, args[0].As<v8::Function>());
1307+
1308+
notification->Register(env->isolate_data());
1309+
args.GetReturnValue().Set(v8::BigInt::New(isolate, notification->id_));
1310+
}
1311+
1312+
void Worker::SendNotification(const FunctionCallbackInfo<Value>& args) {
1313+
CHECK(args[0]->IsBigInt());
1314+
1315+
uint64_t notification_id = args[0].As<v8::BigInt>()->Uint64Value();
1316+
NotificationData* notification = workers_notifications_[notification_id];
1317+
1318+
if (notification == nullptr) {
1319+
return THROW_ERR_INVALID_NOTIFICATION(
1320+
args.GetIsolate(), "Invalid notification %llun", notification_id);
1321+
}
1322+
1323+
uv_async_send(&notification->async_);
1324+
}
1325+
1326+
void Worker::UnregisterNotification(const FunctionCallbackInfo<Value>& args) {
1327+
CHECK(args[0]->IsBigInt());
1328+
1329+
Environment* env = Environment::GetCurrent(args);
1330+
uint64_t notification_id = args[0].As<v8::BigInt>()->Uint64Value();
1331+
NotificationData* notification = workers_notifications_[notification_id];
1332+
1333+
if (notification == nullptr || notification->thread_id_ != env->thread_id()) {
1334+
return THROW_ERR_INVALID_NOTIFICATION(
1335+
args.GetIsolate(), "Invalid notification %un", notification_id);
1336+
}
1337+
1338+
notification->Unregister(env->isolate_data());
1339+
}
1340+
1341+
void Worker::UnregisterNotifications(const FunctionCallbackInfo<Value>& args) {
1342+
UnregisterAllNotifications(Environment::GetCurrent(args)->isolate_data());
1343+
}
1344+
1345+
void Worker::UnregisterAllNotifications(IsolateData* isolate_data) {
1346+
for (auto notification_id : isolate_data->worker_notifications_) {
1347+
workers_notifications_[notification_id]->Unregister(isolate_data);
1348+
}
1349+
1350+
// Wait for the uv_close callbacks to trigger
1351+
if (!isolate_data->worker_notifications_.empty()) {
1352+
uv_run(isolate_data->event_loop(), UV_RUN_ONCE);
1353+
}
1354+
1355+
isolate_data->worker_notifications_.clear();
1356+
}
1357+
1358+
NotificationData::NotificationData(uint64_t thread_id,
1359+
Environment* env,
1360+
v8::Local<v8::Function> callback)
1361+
: id_(++notifications_counter_), thread_id_(thread_id), env_(env) {
1362+
this->async_.data = this;
1363+
this->callback_.Reset(env->isolate(), callback);
1364+
}
1365+
1366+
NotificationData::~NotificationData() {
1367+
workers_notifications_.erase(this->id_);
1368+
1369+
if (!this->callback_.IsEmpty()) {
1370+
this->callback_.Reset();
1371+
}
1372+
}
1373+
1374+
void NotificationData::Register(IsolateData* isolate_data) {
1375+
isolate_data->worker_notifications_.insert(this->id_);
1376+
workers_notifications_.emplace(this->id_, this);
1377+
1378+
uv_async_init(env_->event_loop(), &this->async_, [](uv_async_t* handle) {
1379+
reinterpret_cast<NotificationData*>(handle->data)->Execute();
1380+
});
1381+
}
1382+
1383+
void NotificationData::Unregister(IsolateData* isolate_data) {
1384+
uv_close(reinterpret_cast<uv_handle_t*>(&this->async_),
1385+
[](uv_handle_t* handle) {
1386+
NotificationData* notification =
1387+
reinterpret_cast<NotificationData*>(handle->data);
1388+
delete notification;
1389+
});
1390+
1391+
// Wait for the uv_close callbacks to trigger
1392+
uv_run(isolate_data->event_loop(), UV_RUN_ONCE);
1393+
1394+
isolate_data->worker_notifications_.erase(this->id_);
1395+
workers_notifications_.erase(this->id_);
1396+
}
1397+
1398+
void NotificationData::Execute() {
1399+
this->env_->SetImmediate(
1400+
[this](Environment* env) {
1401+
v8::Isolate* isolate = this->env_->isolate();
1402+
this->callback_.Get(isolate)->Call(
1403+
isolate->GetCurrentContext(), Null(isolate), 0, nullptr);
1404+
},
1405+
CallbackFlags::kUnrefed);
1406+
}
1407+
12841408
namespace {
12851409

12861410
// Return the MessagePort that is global for this Environment and communicates
@@ -1380,6 +1504,19 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
13801504
}
13811505

13821506
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
1507+
1508+
SetMethod(isolate, target, "getNotifications", Worker::GetNotifications);
1509+
SetMethod(
1510+
isolate, target, "registerNotification", Worker::RegisterNotification);
1511+
SetMethod(isolate, target, "sendNotification", Worker::SendNotification);
1512+
SetMethod(isolate,
1513+
target,
1514+
"unregisterNotification",
1515+
Worker::UnregisterNotification);
1516+
SetMethod(isolate,
1517+
target,
1518+
"unregisterNotifications",
1519+
Worker::UnregisterNotifications);
13831520
}
13841521

13851522
void CreateWorkerPerContextProperties(Local<Object> target,
@@ -1458,6 +1595,11 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
14581595
registry->Register(Worker::CpuUsage);
14591596
registry->Register(Worker::StartCpuProfile);
14601597
registry->Register(Worker::StopCpuProfile);
1598+
registry->Register(Worker::GetNotifications);
1599+
registry->Register(Worker::RegisterNotification);
1600+
registry->Register(Worker::SendNotification);
1601+
registry->Register(Worker::UnregisterNotification);
1602+
registry->Register(Worker::UnregisterNotifications);
14611603
}
14621604

14631605
} // anonymous namespace

0 commit comments

Comments
 (0)