Skip to content

Commit 2da3023

Browse files
committed
test: Initial commit of TSFNEx threadsafe test
1 parent 6aeecd4 commit 2da3023

File tree

5 files changed

+358
-0
lines changed

5 files changed

+358
-0
lines changed

test/binding.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Object InitThreadSafeFunction(Env env);
5151
Object InitThreadSafeFunctionExCall(Env env);
5252
Object InitThreadSafeFunctionExContext(Env env);
5353
Object InitThreadSafeFunctionExSimple(Env env);
54+
Object InitThreadSafeFunctionExThreadSafe(Env env);
5455
#endif
5556
Object InitTypedArray(Env env);
5657
Object InitObjectWrap(Env env);
@@ -111,6 +112,7 @@ Object Init(Env env, Object exports) {
111112
exports.Set("threadsafe_function_ex_call", InitThreadSafeFunctionExCall(env));
112113
exports.Set("threadsafe_function_ex_context", InitThreadSafeFunctionExContext(env));
113114
exports.Set("threadsafe_function_ex_simple", InitThreadSafeFunctionExSimple(env));
115+
exports.Set("threadsafe_function_ex_threadsafe", InitThreadSafeFunctionExThreadSafe(env));
114116
#endif
115117
exports.Set("typedarray", InitTypedArray(env));
116118
exports.Set("objectwrap", InitObjectWrap(env));

test/binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
'threadsafe_function_ex/call.cc',
3838
'threadsafe_function_ex/context.cc',
3939
'threadsafe_function_ex/simple.cc',
40+
'threadsafe_function_ex/threadsafe.cc',
4041
'threadsafe_function/threadsafe_function_ctx.cc',
4142
'threadsafe_function/threadsafe_function_existing_tsfn.cc',
4243
'threadsafe_function/threadsafe_function_ptr.cc',

test/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ let testModules = [
4444
'threadsafe_function_ex/call',
4545
'threadsafe_function_ex/context',
4646
'threadsafe_function_ex/simple',
47+
'threadsafe_function_ex/threadsafe',
4748
'threadsafe_function/threadsafe_function_ctx',
4849
'threadsafe_function/threadsafe_function_existing_tsfn',
4950
'threadsafe_function/threadsafe_function_ptr',
@@ -87,6 +88,7 @@ if (napiVersion < 4) {
8788
testModules.splice(testModules.indexOf('threadsafe_function_ex/call'), 1);
8889
testModules.splice(testModules.indexOf('threadsafe_function_ex/context'), 1);
8990
testModules.splice(testModules.indexOf('threadsafe_function_ex/simple'), 1);
91+
testModules.splice(testModules.indexOf('threadsafe_function_ex/threadsafe'), 1);
9092
}
9193

9294
if (napiVersion < 5) {
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
#include <chrono>
2+
#include <thread>
3+
#include "napi.h"
4+
5+
#if (NAPI_VERSION > 3)
6+
7+
using namespace Napi;
8+
9+
constexpr size_t ARRAY_LENGTH = 10;
10+
constexpr size_t MAX_QUEUE_SIZE = 2;
11+
12+
static std::thread threadsEx[2];
13+
static ThreadSafeFunction tsfnEx;
14+
15+
struct ThreadSafeFunctionInfo {
16+
enum CallType {
17+
DEFAULT,
18+
BLOCKING,
19+
NON_BLOCKING
20+
} type;
21+
bool abort;
22+
bool startSecondary;
23+
FunctionReference jsFinalizeCallback;
24+
uint32_t maxQueueSize;
25+
} tsfnInfoEx;
26+
27+
// Thread data to transmit to JS
28+
static int intsEx[ARRAY_LENGTH];
29+
30+
static void SecondaryThreadEx() {
31+
if (tsfnEx.Release() != napi_ok) {
32+
Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed");
33+
}
34+
}
35+
36+
// Source thread producing the data
37+
static void DataSourceThreadEx() {
38+
ThreadSafeFunctionInfo* info = tsfnEx.GetContext();
39+
40+
if (info->startSecondary) {
41+
if (tsfnEx.Acquire() != napi_ok) {
42+
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed");
43+
}
44+
45+
threadsEx[1] = std::thread(SecondaryThreadEx);
46+
}
47+
48+
bool queueWasFull = false;
49+
bool queueWasClosing = false;
50+
for (int index = ARRAY_LENGTH - 1; index > -1 && !queueWasClosing; index--) {
51+
napi_status status = napi_generic_failure;
52+
auto callback = [](Env env, Function jsCallback, int* data) {
53+
jsCallback.Call({ Number::New(env, *data) });
54+
};
55+
56+
switch (info->type) {
57+
case ThreadSafeFunctionInfo::DEFAULT:
58+
status = tsfnEx.BlockingCall();
59+
break;
60+
case ThreadSafeFunctionInfo::BLOCKING:
61+
status = tsfnEx.BlockingCall(&intsEx[index], callback);
62+
break;
63+
case ThreadSafeFunctionInfo::NON_BLOCKING:
64+
status = tsfnEx.NonBlockingCall(&intsEx[index], callback);
65+
break;
66+
}
67+
68+
if (info->maxQueueSize == 0) {
69+
// Let's make this thread really busy for 200 ms to give the main thread a
70+
// chance to abort.
71+
auto start = std::chrono::high_resolution_clock::now();
72+
constexpr auto MS_200 = std::chrono::milliseconds(200);
73+
for (; std::chrono::high_resolution_clock::now() - start < MS_200;);
74+
}
75+
76+
switch (status) {
77+
case napi_queue_full:
78+
queueWasFull = true;
79+
index++;
80+
// fall through
81+
82+
case napi_ok:
83+
continue;
84+
85+
case napi_closing:
86+
queueWasClosing = true;
87+
break;
88+
89+
default:
90+
Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed");
91+
}
92+
}
93+
94+
if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) {
95+
Error::Fatal("DataSourceThread", "Queue was never full");
96+
}
97+
98+
if (info->abort && !queueWasClosing) {
99+
Error::Fatal("DataSourceThread", "Queue was never closing");
100+
}
101+
102+
if (!queueWasClosing && tsfnEx.Release() != napi_ok) {
103+
Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed");
104+
}
105+
}
106+
107+
static Value StopThreadEx(const CallbackInfo& info) {
108+
tsfnInfoEx.jsFinalizeCallback = Napi::Persistent(info[0].As<Function>());
109+
bool abort = info[1].As<Boolean>();
110+
if (abort) {
111+
tsfnEx.Abort();
112+
} else {
113+
tsfnEx.Release();
114+
}
115+
return Value();
116+
}
117+
118+
// Join the thread and inform JS that we're done.
119+
static void JoinTheThreadsEx(Env /* env */,
120+
std::thread* theThreads,
121+
ThreadSafeFunctionInfo* info) {
122+
theThreads[0].join();
123+
if (info->startSecondary) {
124+
theThreads[1].join();
125+
}
126+
127+
info->jsFinalizeCallback.Call({});
128+
info->jsFinalizeCallback.Reset();
129+
}
130+
131+
static Value StartThreadInternalEx(const CallbackInfo& info,
132+
ThreadSafeFunctionInfo::CallType type) {
133+
tsfnInfoEx.type = type;
134+
tsfnInfoEx.abort = info[1].As<Boolean>();
135+
tsfnInfoEx.startSecondary = info[2].As<Boolean>();
136+
tsfnInfoEx.maxQueueSize = info[3].As<Number>().Uint32Value();
137+
138+
tsfnEx = ThreadSafeFunction::New(info.Env(), info[0].As<Function>(),
139+
"Test", tsfnInfoEx.maxQueueSize, 2, &tsfnInfoEx, JoinTheThreadsEx, threadsEx);
140+
141+
threadsEx[0] = std::thread(DataSourceThreadEx);
142+
143+
return Value();
144+
}
145+
146+
static Value ReleaseEx(const CallbackInfo& /* info */) {
147+
if (tsfnEx.Release() != napi_ok) {
148+
Error::Fatal("Release", "ThreadSafeFunction.Release() failed");
149+
}
150+
return Value();
151+
}
152+
153+
static Value StartThreadEx(const CallbackInfo& info) {
154+
return StartThreadInternalEx(info, ThreadSafeFunctionInfo::BLOCKING);
155+
}
156+
157+
static Value StartThreadNonblockingEx(const CallbackInfo& info) {
158+
return StartThreadInternalEx(info, ThreadSafeFunctionInfo::NON_BLOCKING);
159+
}
160+
161+
static Value StartThreadNoNativeEx(const CallbackInfo& info) {
162+
return StartThreadInternalEx(info, ThreadSafeFunctionInfo::DEFAULT);
163+
}
164+
165+
Object InitThreadSafeFunctionExThreadSafe(Env env) {
166+
for (size_t index = 0; index < ARRAY_LENGTH; index++) {
167+
intsEx[index] = index;
168+
}
169+
170+
Object exports = Object::New(env);
171+
exports["ARRAY_LENGTH"] = Number::New(env, ARRAY_LENGTH);
172+
exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE);
173+
exports["startThread"] = Function::New(env, StartThreadEx);
174+
exports["startThreadNoNative"] = Function::New(env, StartThreadNoNativeEx);
175+
exports["startThreadNonblocking"] =
176+
Function::New(env, StartThreadNonblockingEx);
177+
exports["stopThread"] = Function::New(env, StopThreadEx);
178+
exports["release"] = Function::New(env, ReleaseEx);
179+
180+
return exports;
181+
}
182+
183+
#endif
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
'use strict';
2+
3+
const buildType = process.config.target_defaults.default_configuration;
4+
const assert = require('assert');
5+
const common = require('../common');
6+
7+
test(require(`../build/${buildType}/binding.node`));
8+
test(require(`../build/${buildType}/binding_noexcept.node`));
9+
10+
function test(binding) {
11+
const expectedArray = (function(arrayLength) {
12+
const result = [];
13+
for (let index = 0; index < arrayLength; index++) {
14+
result.push(arrayLength - 1 - index);
15+
}
16+
return result;
17+
})(binding.threadsafe_function_ex_threadsafe.ARRAY_LENGTH);
18+
19+
function testWithJSMarshaller({
20+
threadStarter,
21+
quitAfter,
22+
abort,
23+
maxQueueSize,
24+
launchSecondary }) {
25+
return new Promise((resolve) => {
26+
const array = [];
27+
binding.threadsafe_function_ex_threadsafe[threadStarter](function testCallback(value) {
28+
array.push(value);
29+
if (array.length === quitAfter) {
30+
setImmediate(() => {
31+
binding.threadsafe_function_ex_threadsafe.stopThread(common.mustCall(() => {
32+
resolve(array);
33+
}), !!abort);
34+
});
35+
}
36+
}, !!abort, !!launchSecondary, maxQueueSize);
37+
if (threadStarter === 'startThreadNonblocking') {
38+
// Let's make this thread really busy for a short while to ensure that
39+
// the queue fills and the thread receives a napi_queue_full.
40+
const start = Date.now();
41+
while (Date.now() - start < 200);
42+
}
43+
});
44+
}
45+
46+
new Promise(function testWithoutJSMarshaller(resolve) {
47+
let callCount = 0;
48+
binding.threadsafe_function_ex_threadsafe.startThreadNoNative(function testCallback() {
49+
callCount++;
50+
51+
// The default call-into-JS implementation passes no arguments.
52+
assert.strictEqual(arguments.length, 0);
53+
if (callCount === binding.threadsafe_function_ex_threadsafe.ARRAY_LENGTH) {
54+
setImmediate(() => {
55+
binding.threadsafe_function_ex_threadsafe.stopThread(common.mustCall(() => {
56+
resolve();
57+
}), false);
58+
});
59+
}
60+
}, false /* abort */, false /* launchSecondary */,
61+
binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE);
62+
})
63+
64+
// Start the thread in blocking mode, and assert that all values are passed.
65+
// Quit after it's done.
66+
.then(() => testWithJSMarshaller({
67+
threadStarter: 'startThread',
68+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
69+
quitAfter: binding.threadsafe_function_ex_threadsafe.ARRAY_LENGTH
70+
}))
71+
.then((result) => assert.deepStrictEqual(result, expectedArray))
72+
73+
// Start the thread in blocking mode with an infinite queue, and assert that
74+
// all values are passed. Quit after it's done.
75+
.then(() => testWithJSMarshaller({
76+
threadStarter: 'startThread',
77+
maxQueueSize: 0,
78+
quitAfter: binding.threadsafe_function_ex_threadsafe.ARRAY_LENGTH
79+
}))
80+
.then((result) => assert.deepStrictEqual(result, expectedArray))
81+
82+
// Start the thread in non-blocking mode, and assert that all values are
83+
// passed. Quit after it's done.
84+
.then(() => testWithJSMarshaller({
85+
threadStarter: 'startThreadNonblocking',
86+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
87+
quitAfter: binding.threadsafe_function_ex_threadsafe.ARRAY_LENGTH
88+
}))
89+
.then((result) => assert.deepStrictEqual(result, expectedArray))
90+
91+
// Start the thread in blocking mode, and assert that all values are passed.
92+
// Quit early, but let the thread finish.
93+
.then(() => testWithJSMarshaller({
94+
threadStarter: 'startThread',
95+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
96+
quitAfter: 1
97+
}))
98+
.then((result) => assert.deepStrictEqual(result, expectedArray))
99+
100+
// Start the thread in blocking mode with an infinite queue, and assert that
101+
// all values are passed. Quit early, but let the thread finish.
102+
.then(() => testWithJSMarshaller({
103+
threadStarter: 'startThread',
104+
maxQueueSize: 0,
105+
quitAfter: 1
106+
}))
107+
.then((result) => assert.deepStrictEqual(result, expectedArray))
108+
109+
110+
// Start the thread in non-blocking mode, and assert that all values are
111+
// passed. Quit early, but let the thread finish.
112+
.then(() => testWithJSMarshaller({
113+
threadStarter: 'startThreadNonblocking',
114+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
115+
quitAfter: 1
116+
}))
117+
.then((result) => assert.deepStrictEqual(result, expectedArray))
118+
119+
// Start the thread in blocking mode, and assert that all values are passed.
120+
// Quit early, but let the thread finish. Launch a secondary thread to test
121+
// the reference counter incrementing functionality.
122+
.then(() => testWithJSMarshaller({
123+
threadStarter: 'startThread',
124+
quitAfter: 1,
125+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
126+
launchSecondary: true
127+
}))
128+
.then((result) => assert.deepStrictEqual(result, expectedArray))
129+
130+
// Start the thread in non-blocking mode, and assert that all values are
131+
// passed. Quit early, but let the thread finish. Launch a secondary thread
132+
// to test the reference counter incrementing functionality.
133+
.then(() => testWithJSMarshaller({
134+
threadStarter: 'startThreadNonblocking',
135+
quitAfter: 1,
136+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
137+
launchSecondary: true
138+
}))
139+
.then((result) => assert.deepStrictEqual(result, expectedArray))
140+
141+
// Start the thread in blocking mode, and assert that it could not finish.
142+
// Quit early by aborting.
143+
.then(() => testWithJSMarshaller({
144+
threadStarter: 'startThread',
145+
quitAfter: 1,
146+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
147+
abort: true
148+
}))
149+
.then((result) => assert.strictEqual(result.indexOf(0), -1))
150+
151+
// Start the thread in blocking mode with an infinite queue, and assert that
152+
// it could not finish. Quit early by aborting.
153+
.then(() => testWithJSMarshaller({
154+
threadStarter: 'startThread',
155+
quitAfter: 1,
156+
maxQueueSize: 0,
157+
abort: true
158+
}))
159+
.then((result) => assert.strictEqual(result.indexOf(0), -1))
160+
161+
// Start the thread in non-blocking mode, and assert that it could not finish.
162+
// Quit early and aborting.
163+
.then(() => testWithJSMarshaller({
164+
threadStarter: 'startThreadNonblocking',
165+
quitAfter: 1,
166+
maxQueueSize: binding.threadsafe_function_ex_threadsafe.MAX_QUEUE_SIZE,
167+
abort: true
168+
}))
169+
.then((result) => assert.strictEqual(result.indexOf(0), -1))
170+
}

0 commit comments

Comments
 (0)