@@ -200,7 +200,7 @@ class BufferFinalizer : private Finalizer {
200200 ~BufferFinalizer () { env ()->Unref (); }
201201};
202202
203- class ThreadSafeFunction : public node ::AsyncResource {
203+ class ThreadSafeFunction {
204204 public:
205205 ThreadSafeFunction (v8::Local<v8::Function> func,
206206 v8::Local<v8::Object> resource,
@@ -212,11 +212,13 @@ class ThreadSafeFunction : public node::AsyncResource {
212212 void * finalize_data_,
213213 napi_finalize finalize_cb_,
214214 napi_threadsafe_function_call_js call_js_cb_)
215- : AsyncResource(env_->isolate,
216- resource,
217- node::Utf8Value (env_->isolate, name).ToStringView()),
215+ : async_resource(std::in_place,
216+ env_->isolate,
217+ resource,
218+ node::Utf8Value (env_->isolate, name).ToStringView()),
218219 thread_count(thread_count_),
219220 is_closing(false ),
221+ is_closed(false ),
220222 dispatch_state(kDispatchIdle ),
221223 context(context_),
222224 max_queue_size(max_queue_size_),
@@ -230,36 +232,38 @@ class ThreadSafeFunction : public node::AsyncResource {
230232 env->Ref ();
231233 }
232234
233- ~ThreadSafeFunction () override {
234- node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
235- env->Unref ();
236- }
235+ ~ThreadSafeFunction () { ReleaseResources (); }
237236
238237 // These methods can be called from any thread.
239238
240239 napi_status Push (void * data, napi_threadsafe_function_call_mode mode) {
241- node::Mutex::ScopedLock lock (this ->mutex );
240+ {
241+ node::Mutex::ScopedLock lock (this ->mutex );
242242
243- while (queue.size () >= max_queue_size && max_queue_size > 0 &&
244- !is_closing) {
245- if (mode == napi_tsfn_nonblocking) {
246- return napi_queue_full;
243+ while (queue.size () >= max_queue_size && max_queue_size > 0 &&
244+ !is_closing) {
245+ if (mode == napi_tsfn_nonblocking) {
246+ return napi_queue_full;
247+ }
248+ cond->Wait (lock);
247249 }
248- cond->Wait (lock);
249- }
250250
251- if (is_closing) {
251+ if (!is_closing) {
252+ queue.push (data);
253+ Send ();
254+ return napi_ok;
255+ }
252256 if (thread_count == 0 ) {
253257 return napi_invalid_arg;
254- } else {
255- thread_count--;
258+ }
259+ thread_count--;
260+ if (!is_closed || thread_count > 0 ) {
256261 return napi_closing;
257262 }
258- } else {
259- queue.push (data);
260- Send ();
261- return napi_ok;
262263 }
264+ // Make sure to release lock before destroying
265+ delete this ;
266+ return napi_closing;
263267 }
264268
265269 napi_status Acquire () {
@@ -275,31 +279,51 @@ class ThreadSafeFunction : public node::AsyncResource {
275279 }
276280
277281 napi_status Release (napi_threadsafe_function_release_mode mode) {
278- node::Mutex::ScopedLock lock (this ->mutex );
282+ {
283+ node::Mutex::ScopedLock lock (this ->mutex );
279284
280- if (thread_count == 0 ) {
281- return napi_invalid_arg;
282- }
285+ if (thread_count == 0 ) {
286+ return napi_invalid_arg;
287+ }
283288
284- thread_count--;
289+ thread_count--;
285290
286- if (thread_count == 0 || mode == napi_tsfn_abort) {
287- if (!is_closing) {
288- is_closing = (mode == napi_tsfn_abort);
289- if (is_closing && max_queue_size > 0 ) {
290- cond->Signal (lock);
291+ if (thread_count == 0 || mode == napi_tsfn_abort) {
292+ if (!is_closing) {
293+ is_closing = (mode == napi_tsfn_abort);
294+ if (is_closing && max_queue_size > 0 ) {
295+ cond->Signal (lock);
296+ }
297+ Send ();
291298 }
292- Send ();
293299 }
294- }
295300
301+ if (!is_closed || thread_count > 0 ) {
302+ return napi_ok;
303+ }
304+ }
305+ // Make sure to release lock before destroying
306+ delete this ;
296307 return napi_ok;
297308 }
298309
299- void EmptyQueueAndDelete () {
300- for (; !queue.empty (); queue.pop ()) {
301- call_js_cb (nullptr , nullptr , context, queue.front ());
310+ void EmptyQueueAndMaybeDelete () {
311+ {
312+ node::Mutex::ScopedLock lock (this ->mutex );
313+ for (; !queue.empty (); queue.pop ()) {
314+ call_js_cb (nullptr , nullptr , context, queue.front ());
315+ }
316+ if (thread_count > 0 ) {
317+ // At this point this TSFN is effectively done, but we need to keep
318+ // it alive for other threads that still have pointers to it until
319+ // they release them.
320+ // But we already release all the resources that we can at this point
321+ queue = {};
322+ ReleaseResources ();
323+ return ;
324+ }
302325 }
326+ // Make sure to release lock before destroying
303327 delete this ;
304328 }
305329
@@ -351,6 +375,16 @@ class ThreadSafeFunction : public node::AsyncResource {
351375 inline void * Context () { return context; }
352376
353377 protected:
378+ void ReleaseResources () {
379+ if (!is_closed) {
380+ is_closed = true ;
381+ ref.Reset ();
382+ node::RemoveEnvironmentCleanupHook (env->isolate , Cleanup, this );
383+ env->Unref ();
384+ async_resource.reset ();
385+ }
386+ }
387+
354388 void Dispatch () {
355389 bool has_more = true ;
356390
@@ -409,7 +443,7 @@ class ThreadSafeFunction : public node::AsyncResource {
409443
410444 if (popped_value) {
411445 v8::HandleScope scope (env->isolate );
412- CallbackScope cb_scope (this );
446+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
413447 napi_value js_callback = nullptr ;
414448 if (!ref.IsEmpty ()) {
415449 v8::Local<v8::Function> js_cb =
@@ -426,10 +460,10 @@ class ThreadSafeFunction : public node::AsyncResource {
426460 void Finalize () {
427461 v8::HandleScope scope (env->isolate );
428462 if (finalize_cb) {
429- CallbackScope cb_scope (this );
463+ AsyncResource:: CallbackScope cb_scope (&*async_resource );
430464 env->CallFinalizer <false >(finalize_cb, finalize_data, context);
431465 }
432- EmptyQueueAndDelete ();
466+ EmptyQueueAndMaybeDelete ();
433467 }
434468
435469 void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
@@ -501,19 +535,29 @@ class ThreadSafeFunction : public node::AsyncResource {
501535 }
502536
503537 private:
538+ // Needed because node::AsyncResource::CallbackScope is protected
539+ class AsyncResource : public node ::AsyncResource {
540+ public:
541+ using node::AsyncResource::AsyncResource;
542+ using node::AsyncResource::CallbackScope;
543+ };
544+
504545 static const unsigned char kDispatchIdle = 0 ;
505546 static const unsigned char kDispatchRunning = 1 << 0 ;
506547 static const unsigned char kDispatchPending = 1 << 1 ;
507548
508549 static const unsigned int kMaxIterationCount = 1000 ;
509550
551+ std::optional<AsyncResource> async_resource;
552+
510553 // These are variables protected by the mutex.
511554 node::Mutex mutex;
512555 std::unique_ptr<node::ConditionVariable> cond;
513556 std::queue<void *> queue;
514557 uv_async_t async;
515558 size_t thread_count;
516559 bool is_closing;
560+ bool is_closed;
517561 std::atomic_uchar dispatch_state;
518562
519563 // These are variables set once, upon creation, and then never again, which
0 commit comments