@@ -200,7 +200,7 @@ class BufferFinalizer : private Finalizer {
200200 ~BufferFinalizer () { env ()->Unref (); }
201201};
202202
203- class ThreadSafeFunction : public node ::AsyncResource {
203+ class ThreadSafeFunction {
204204 public:
205205 ThreadSafeFunction (v8::Local<v8::Function> func,
206206 v8::Local<v8::Object> resource,
@@ -212,11 +212,12 @@ class ThreadSafeFunction : public node::AsyncResource {
212212 void * finalize_data_,
213213 napi_finalize finalize_cb_,
214214 napi_threadsafe_function_call_js call_js_cb_)
215- : AsyncResource(env_->isolate,
216- resource,
217- node::Utf8Value (env_->isolate, name).ToStringView()),
215+ : async_resource(std::in_place,
216+ env_->isolate,
217+ resource,
218+ node::Utf8Value (env_->isolate, name).ToStringView()),
218219 thread_count(thread_count_),
219- is_closing( false ),
220+ state( kOpen ),
220221 dispatch_state(kDispatchIdle ),
221222 context(context_),
222223 max_queue_size(max_queue_size_),
@@ -230,76 +231,104 @@ class ThreadSafeFunction : public node::AsyncResource {
230231 env->Ref ();
231232 }
232233
233- ~ThreadSafeFunction () override {
234- node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
235- env->Unref ();
236- }
234+ ~ThreadSafeFunction () { ReleaseResources (); }
237235
238236 // These methods can be called from any thread.
239237
240238 napi_status Push (void * data, napi_threadsafe_function_call_mode mode) {
241- node::Mutex::ScopedLock lock (this ->mutex );
239+ {
240+ node::Mutex::ScopedLock lock (this ->mutex );
242241
243- while (queue.size () >= max_queue_size && max_queue_size > 0 &&
244- !is_closing) {
245- if (mode == napi_tsfn_nonblocking) {
246- return napi_queue_full;
242+ while (queue.size () >= max_queue_size && max_queue_size > 0 &&
243+ state == kOpen ) {
244+ if (mode == napi_tsfn_nonblocking) {
245+ return napi_queue_full;
246+ }
247+ cond->Wait (lock);
247248 }
248- cond->Wait (lock);
249- }
250249
251- if (is_closing) {
250+ if (state == kOpen ) {
251+ queue.push (data);
252+ Send ();
253+ return napi_ok;
254+ }
252255 if (thread_count == 0 ) {
253256 return napi_invalid_arg;
254- } else {
255- thread_count--;
257+ }
258+ thread_count--;
259+ if (!(state == kClosed && thread_count == 0 )) {
256260 return napi_closing;
257261 }
258- } else {
259- queue.push (data);
260- Send ();
261- return napi_ok;
262262 }
263+ // Make sure to release lock before destroying
264+ delete this ;
265+ return napi_closing;
263266 }
264267
265268 napi_status Acquire () {
266269 node::Mutex::ScopedLock lock (this ->mutex );
267270
268- if (is_closing) {
269- return napi_closing;
270- }
271+ if (state == kOpen ) {
272+ thread_count++;
271273
272- thread_count++;
274+ return napi_ok;
275+ }
273276
274- return napi_ok ;
277+ return napi_closing ;
275278 }
276279
277280 napi_status Release (napi_threadsafe_function_release_mode mode) {
278- node::Mutex::ScopedLock lock (this ->mutex );
281+ {
282+ node::Mutex::ScopedLock lock (this ->mutex );
279283
280- if (thread_count == 0 ) {
281- return napi_invalid_arg;
282- }
284+ if (thread_count == 0 ) {
285+ return napi_invalid_arg;
286+ }
283287
284- thread_count--;
288+ thread_count--;
285289
286- if (thread_count == 0 || mode == napi_tsfn_abort) {
287- if (!is_closing) {
288- is_closing = (mode == napi_tsfn_abort);
289- if (is_closing && max_queue_size > 0 ) {
290- cond->Signal (lock);
290+ if (thread_count == 0 || mode == napi_tsfn_abort) {
291+ if (state == kOpen ) {
292+ if (mode == napi_tsfn_abort) {
293+ state = kClosing ;
294+ }
295+ if (state == kClosing && max_queue_size > 0 ) {
296+ cond->Signal (lock);
297+ }
298+ Send ();
291299 }
292- Send ();
293300 }
294- }
295301
302+ if (!(state == kClosed && thread_count == 0 )) {
303+ return napi_ok;
304+ }
305+ }
306+ // Make sure to release lock before destroying
307+ delete this ;
296308 return napi_ok;
297309 }
298310
299- void EmptyQueueAndDelete () {
300- for (; !queue.empty (); queue.pop ()) {
301- call_js_cb (nullptr , nullptr , context, queue.front ());
311+ void EmptyQueueAndMaybeDelete () {
312+ std::queue<void *> drain_queue;
313+ {
314+ node::Mutex::ScopedLock lock (this ->mutex );
315+ queue.swap (drain_queue);
302316 }
317+ for (; !drain_queue.empty (); drain_queue.pop ()) {
318+ call_js_cb (nullptr , nullptr , context, drain_queue.front ());
319+ }
320+ {
321+ node::Mutex::ScopedLock lock (this ->mutex );
322+ if (thread_count > 0 ) {
323+ // At this point this TSFN is effectively done, but we need to keep
324+ // it alive for other threads that still have pointers to it until
325+ // they release them.
326+ // But we already release all the resources that we can at this point
327+ ReleaseResources ();
328+ return ;
329+ }
330+ }
331+ // Make sure to release lock before destroying
303332 delete this ;
304333 }
305334
@@ -351,6 +380,16 @@ class ThreadSafeFunction : public node::AsyncResource {
351380 inline void * Context () { return context; }
352381
353382 protected:
383+ void ReleaseResources () {
384+ if (state != kClosed ) {
385+ state = kClosed ;
386+ ref.Reset ();
387+ node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
388+ env->Unref ();
389+ async_resource.reset ();
390+ }
391+ }
392+
354393 void Dispatch () {
355394 bool has_more = true ;
356395
@@ -379,9 +418,7 @@ class ThreadSafeFunction : public node::AsyncResource {
379418
380419 {
381420 node::Mutex::ScopedLock lock (this ->mutex );
382- if (is_closing) {
383- CloseHandlesAndMaybeDelete ();
384- } else {
421+ if (state == kOpen ) {
385422 size_t size = queue.size ();
386423 if (size > 0 ) {
387424 data = queue.front ();
@@ -395,7 +432,7 @@ class ThreadSafeFunction : public node::AsyncResource {
395432
396433 if (size == 0 ) {
397434 if (thread_count == 0 ) {
398- is_closing = true ;
435+ state = kClosing ;
399436 if (max_queue_size > 0 ) {
400437 cond->Signal (lock);
401438 }
@@ -404,12 +441,14 @@ class ThreadSafeFunction : public node::AsyncResource {
404441 } else {
405442 has_more = true ;
406443 }
444+ } else {
445+ CloseHandlesAndMaybeDelete ();
407446 }
408447 }
409448
410449 if (popped_value) {
411450 v8::HandleScope scope (env->isolate );
412- CallbackScope cb_scope (this );
451+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
413452 napi_value js_callback = nullptr ;
414453 if (!ref.IsEmpty ()) {
415454 v8::Local<v8::Function> js_cb =
@@ -426,17 +465,17 @@ class ThreadSafeFunction : public node::AsyncResource {
426465 void Finalize () {
427466 v8::HandleScope scope (env->isolate );
428467 if (finalize_cb) {
429- CallbackScope cb_scope (this );
468+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
430469 env->CallFinalizer <false >(finalize_cb, finalize_data, context);
431470 }
432- EmptyQueueAndDelete ();
471+ EmptyQueueAndMaybeDelete ();
433472 }
434473
435474 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
436475 v8::HandleScope scope (env->isolate );
437476 if (set_closing) {
438477 node::Mutex::ScopedLock lock (this ->mutex );
439- is_closing = true ;
478+ state = kClosing ;
440479 if (max_queue_size > 0 ) {
441480 cond->Signal (lock);
442481 }
@@ -501,19 +540,30 @@ class ThreadSafeFunction : public node::AsyncResource {
501540 }
502541
503542 private:
543+ // Needed because node::AsyncResource::CallbackScope is protected
544+ class AsyncResource : public node ::AsyncResource {
545+ public:
546+ using node::AsyncResource::AsyncResource;
547+ using node::AsyncResource::CallbackScope;
548+ };
549+
550+ enum State : unsigned char { kOpen , kClosing , kClosed };
551+
504552 static const unsigned char kDispatchIdle = 0 ;
505553 static const unsigned char kDispatchRunning = 1 << 0 ;
506554 static const unsigned char kDispatchPending = 1 << 1 ;
507555
508556 static const unsigned int kMaxIterationCount = 1000 ;
509557
558+ std::optional<AsyncResource> async_resource;
559+
510560 // These are variables protected by the mutex.
511561 node::Mutex mutex;
512562 std::unique_ptr<node::ConditionVariable> cond;
513563 std::queue<void *> queue;
514564 uv_async_t async;
515565 size_t thread_count;
516- bool is_closing ;
566+ State state ;
517567 std::atomic_uchar dispatch_state;
518568
519569 // These are variables set once, upon creation, and then never again, which
0 commit comments