Skip to content

Commit 46a82b7

Browse files
committed
fix: execute tsfn finalizer after queue drains when aborted (nodejs/node#61956)
1 parent cafd29b commit 46a82b7

File tree

6 files changed

+133
-4
lines changed

6 files changed

+133
-4
lines changed

packages/emnapi/src/threadsafe-function.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ const emnapiTSFN = {
524524
emnapiTSFN.releaseResources(func)
525525
_free(to64('func') as number)
526526
},
527-
emptyQueueAndMaybeDelete (func: number) {
527+
emptyQueue (func: number) {
528528
const drainQueue: number[] = []
529529
emnapiTSFN.getMutex(func).execute(() => {
530530
while (emnapiTSFN.getQueueSize(func) > 0) {
@@ -541,6 +541,8 @@ const emnapiTSFN = {
541541
makeDynCall('vpppp', 'callJsCb')(to64('0'), to64('0'), to64('context'), to64('data'))
542542
}
543543
}
544+
},
545+
maybeDelete (func: number) {
544546
let shouldDelete = false
545547
emnapiTSFN.getMutex(func).execute(() => {
546548
if (emnapiTSFN.getThreadCount(func) > 0) {
@@ -574,6 +576,7 @@ const emnapiTSFN = {
574576
}
575577

576578
try {
579+
emnapiTSFN.emptyQueue(func)
577580
if (finalize) {
578581
if (emnapiNodeBinding) {
579582
const resource = emnapiTSFN.getResource(func)
@@ -590,7 +593,7 @@ const emnapiTSFN = {
590593
f()
591594
}
592595
}
593-
emnapiTSFN.emptyQueueAndMaybeDelete(func)
596+
emnapiTSFN.maybeDelete(func)
594597
} finally {
595598
emnapiCtx.closeScope(envObject)
596599
}

packages/emnapi/src/threadsafe_function.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ static napi_status _emnapi_tsfn_init(napi_threadsafe_function func) {
167167
return napi_generic_failure;
168168
}
169169

170-
static void _emnapi_tsfn_empty_queue_and_maybe_delete(napi_threadsafe_function func) {
170+
static void _emnapi_tsfn_empty_queue(napi_threadsafe_function func) {
171171
struct uv__queue drain_queue;
172172
uv__queue_init(&drain_queue);
173173
pthread_mutex_lock(&func->mutex);
@@ -186,6 +186,9 @@ static void _emnapi_tsfn_empty_queue_and_maybe_delete(napi_threadsafe_function f
186186
uv__queue_init(q);
187187
free(node);
188188
}
189+
}
190+
191+
static void _emnapi_tsfn_maybe_delete(napi_threadsafe_function func) {
189192
pthread_mutex_lock(&func->mutex);
190193
if (func->thread_count > 0) {
191194
_emnapi_tsfn_release_resources(func);
@@ -207,6 +210,7 @@ static napi_value _emnapi_tsfn_finalize_in_callback_scope(napi_env env, napi_cal
207210

208211
static void _emnapi_tsfn_finalize(napi_threadsafe_function func) {
209212
napi_handle_scope scope = _emnapi_open_handle_scope();
213+
_emnapi_tsfn_empty_queue(func);
210214
if (func->finalize_cb) {
211215
if (emnapi_is_node_binding_available()) {
212216
napi_value resource, cb;
@@ -224,7 +228,7 @@ static void _emnapi_tsfn_finalize(napi_threadsafe_function func) {
224228
_emnapi_call_finalizer(0, func->env, func->finalize_cb, func->finalize_data, func->context);
225229
}
226230
}
227-
_emnapi_tsfn_empty_queue_and_maybe_delete(func);
231+
_emnapi_tsfn_maybe_delete(func);
228232
_emnapi_close_handle_scope(scope);
229233
}
230234

packages/test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ if((NOT IS_WASM) OR IS_EMSCRIPTEN OR IS_WASI_THREADS)
303303
add_test("pool" "./pool/binding.c" ON)
304304
add_test("tsfn" "./tsfn/binding.c" ON)
305305
add_test("tsfn_shutdown" "./tsfn_shutdown/binding.cc" ON)
306+
add_test("tsfn_abort" "./tsfn_abort/binding.cc" ON)
307+
target_compile_options("tsfn_abort" PRIVATE "-fno-rtti")
306308
add_test("async_cleanup_hook" "./async_cleanup_hook/binding.c" ON)
307309
add_test("uv_threadpool_size" "./uv_threadpool_size/binding.c" ON)
308310
add_test("trap_in_thread" "./trap_in_thread/binding.c" ON)

packages/test/script/test.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const pthread = [
2222
'pool/**/*',
2323
'tsfn/**/*',
2424
'tsfn_shutdown/**/*',
25+
'tsfn_abort/**/*',
2526
'async_cleanup_hook/**/*',
2627
'string/string-pthread.test.js',
2728
'uv_threadpool_size/**/*',
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#include <js_native_api.h>
2+
#include <node_api.h>
3+
#include <node_api_types.h>
4+
5+
#include <cstdio>
6+
#include <cstdlib>
7+
#include <functional>
8+
#include <type_traits>
9+
10+
template <typename R, auto func, typename... Args>
11+
inline auto call(const char* name, Args&&... args) -> R {
12+
napi_status status;
13+
if constexpr (std::is_same_v<R, void>) {
14+
status = func(std::forward<Args>(args)...);
15+
if (status == napi_ok) {
16+
return;
17+
}
18+
} else {
19+
R ret;
20+
status = func(std::forward<Args>(args)..., &ret);
21+
if (status == napi_ok) {
22+
return ret;
23+
}
24+
}
25+
std::fprintf(stderr, "%s: %d\n", name, status);
26+
std::abort();
27+
}
28+
29+
#define NAPI_CALL(ret_type, func, ...) \
30+
call<ret_type, func>(#func, ##__VA_ARGS__)
31+
32+
class Context {
33+
public:
34+
~Context() {
35+
std::fprintf(stderr, "Context: destructor called\n");
36+
get = [](void*) {
37+
std::abort();
38+
return 0;
39+
};
40+
}
41+
42+
std::function<int*(int)> create = [](int value) {
43+
std::fprintf(stderr, "Context: create called\n");
44+
return new int(value);
45+
};
46+
47+
std::function<int(void*)> get = [](void* ptr) {
48+
std::fprintf(stderr, "Context: get called\n");
49+
return *static_cast<int*>(ptr);
50+
};
51+
52+
std::function<void(void*)> deleter = [](void* ptr) {
53+
std::fprintf(stderr, "Context: deleter called\n");
54+
delete static_cast<int*>(ptr);
55+
};
56+
};
57+
58+
void tsfn_callback(napi_env env, napi_value js_cb, void* ctx_p, void* data) {
59+
auto ctx = static_cast<Context*>(ctx_p);
60+
std::fprintf(stderr, "tsfn_callback: env=%p data=%d\n", env, ctx->get(data));
61+
ctx->deleter(data);
62+
}
63+
64+
void tsfn_finalize(napi_env env, void* finalize_data, void* finalize_hint) {
65+
auto ctx = static_cast<Context*>(finalize_hint);
66+
std::fprintf(stderr,
67+
"tsfn_finalize: env=%p finalize_data=%p finalize_hint=%p\n",
68+
env,
69+
finalize_data,
70+
finalize_hint);
71+
delete ctx;
72+
}
73+
74+
auto run(napi_env env, napi_callback_info info) -> napi_value {
75+
auto global = NAPI_CALL(napi_value, napi_get_global, env);
76+
auto undefined = NAPI_CALL(napi_value, napi_get_undefined, env);
77+
auto ctx = new Context();
78+
auto tsfn = NAPI_CALL(napi_threadsafe_function,
79+
napi_create_threadsafe_function,
80+
env,
81+
nullptr,
82+
global,
83+
undefined,
84+
0,
85+
1 /* initial_thread_count */,
86+
nullptr,
87+
tsfn_finalize,
88+
ctx,
89+
tsfn_callback);
90+
91+
NAPI_CALL(void,
92+
napi_call_threadsafe_function,
93+
tsfn,
94+
ctx->create(1),
95+
napi_tsfn_blocking);
96+
97+
NAPI_CALL(void, napi_unref_threadsafe_function, env, tsfn);
98+
99+
NAPI_CALL(void,
100+
napi_release_threadsafe_function,
101+
tsfn,
102+
napi_threadsafe_function_release_mode::napi_tsfn_abort);
103+
return NAPI_CALL(napi_value, napi_get_undefined, env);
104+
}
105+
106+
napi_value init(napi_env env, napi_value exports) {
107+
return NAPI_CALL(
108+
napi_value, napi_create_function, env, nullptr, 0, run, nullptr);
109+
}
110+
111+
NAPI_MODULE(NODE_GYP_MODULE_NAME, init)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
'use strict'
2+
const { load } = require('../util')
3+
4+
module.exports = new Promise((resolve, reject) => {
5+
load('tsfn_abort', { nodeBinding: require('@emnapi/node-binding') }).then((binding) => {
6+
binding()
7+
}).catch(reject)
8+
})

0 commit comments

Comments
 (0)