Skip to content

Commit 2e71842

Browse files
KevinEadymhdawson
authored andcommitted
tsfn: Implement copy constructor
* tsfn: Implement copy constructor Refs: #524 PR-URL: #546 Reviewed-By: Michael Dawson <[email protected]> Reviewed-By: Chengzhong Wu <[email protected]>
1 parent 650562c commit 2e71842

File tree

7 files changed

+284
-34
lines changed

7 files changed

+284
-34
lines changed

napi-inl.h

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4025,29 +4025,16 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
40254025
}
40264026

40274027
inline ThreadSafeFunction::ThreadSafeFunction()
4028-
: _tsfn(new napi_threadsafe_function(nullptr), _d) {
4028+
: _tsfn() {
40294029
}
40304030

40314031
inline ThreadSafeFunction::ThreadSafeFunction(
40324032
napi_threadsafe_function tsfn)
4033-
: _tsfn(new napi_threadsafe_function(tsfn), _d) {
4033+
: _tsfn(tsfn) {
40344034
}
40354035

4036-
inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other)
4037-
: _tsfn(std::move(other._tsfn)) {
4038-
other._tsfn.reset();
4039-
}
4040-
4041-
inline ThreadSafeFunction& ThreadSafeFunction::operator =(
4042-
ThreadSafeFunction&& other) {
4043-
if (*_tsfn != nullptr) {
4044-
Error::Fatal("ThreadSafeFunction::operator =",
4045-
"You cannot assign a new TSFN because existing one is still alive.");
4046-
return *this;
4047-
}
4048-
_tsfn = std::move(other._tsfn);
4049-
other._tsfn.reset();
4050-
return *this;
4036+
inline ThreadSafeFunction::operator napi_threadsafe_function() const {
4037+
return _tsfn;
40514038
}
40524039

40534040
inline napi_status ThreadSafeFunction::BlockingCall() const {
@@ -4090,34 +4077,34 @@ inline napi_status ThreadSafeFunction::NonBlockingCall(
40904077

40914078
inline void ThreadSafeFunction::Ref(napi_env env) const {
40924079
if (_tsfn != nullptr) {
4093-
napi_status status = napi_ref_threadsafe_function(env, *_tsfn);
4080+
napi_status status = napi_ref_threadsafe_function(env, _tsfn);
40944081
NAPI_THROW_IF_FAILED_VOID(env, status);
40954082
}
40964083
}
40974084

40984085
inline void ThreadSafeFunction::Unref(napi_env env) const {
40994086
if (_tsfn != nullptr) {
4100-
napi_status status = napi_unref_threadsafe_function(env, *_tsfn);
4087+
napi_status status = napi_unref_threadsafe_function(env, _tsfn);
41014088
NAPI_THROW_IF_FAILED_VOID(env, status);
41024089
}
41034090
}
41044091

41054092
inline napi_status ThreadSafeFunction::Acquire() const {
4106-
return napi_acquire_threadsafe_function(*_tsfn);
4093+
return napi_acquire_threadsafe_function(_tsfn);
41074094
}
41084095

41094096
inline napi_status ThreadSafeFunction::Release() {
4110-
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release);
4097+
return napi_release_threadsafe_function(_tsfn, napi_tsfn_release);
41114098
}
41124099

41134100
inline napi_status ThreadSafeFunction::Abort() {
4114-
return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort);
4101+
return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort);
41154102
}
41164103

41174104
inline ThreadSafeFunction::ConvertibleContext
41184105
ThreadSafeFunction::GetContext() const {
41194106
void* context;
4120-
napi_get_threadsafe_function_context(*_tsfn, &context);
4107+
napi_get_threadsafe_function_context(_tsfn, &context);
41214108
return ConvertibleContext({ context });
41224109
}
41234110

@@ -4140,10 +4127,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
41404127

41414128
ThreadSafeFunction tsfn;
41424129
auto* finalizeData = new details::ThreadSafeFinalize<ContextType, Finalizer,
4143-
FinalizerDataType>({ data, finalizeCallback, tsfn._tsfn.get() });
4130+
FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn });
41444131
napi_status status = napi_create_threadsafe_function(env, callback, resource,
41454132
Value::From(env, resourceName), maxQueueSize, initialThreadCount,
4146-
finalizeData, wrapper, context, CallJS, tsfn._tsfn.get());
4133+
finalizeData, wrapper, context, CallJS, &tsfn._tsfn);
41474134
if (status != napi_ok) {
41484135
delete finalizeData;
41494136
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
@@ -4156,7 +4143,7 @@ inline napi_status ThreadSafeFunction::CallInternal(
41564143
CallbackWrapper* callbackWrapper,
41574144
napi_threadsafe_function_call_mode mode) const {
41584145
napi_status status = napi_call_threadsafe_function(
4159-
*_tsfn, callbackWrapper, mode);
4146+
_tsfn, callbackWrapper, mode);
41604147
if (status != napi_ok && callbackWrapper != nullptr) {
41614148
delete callbackWrapper;
41624149
}

napi.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2009,8 +2009,7 @@ namespace Napi {
20092009
ThreadSafeFunction();
20102010
ThreadSafeFunction(napi_threadsafe_function tsFunctionValue);
20112011

2012-
ThreadSafeFunction(ThreadSafeFunction&& other);
2013-
ThreadSafeFunction& operator=(ThreadSafeFunction&& other);
2012+
operator napi_threadsafe_function() const;
20142013

20152014
// This API may be called from any thread.
20162015
napi_status BlockingCall() const;
@@ -2082,13 +2081,8 @@ namespace Napi {
20822081
napi_value jsCallback,
20832082
void* context,
20842083
void* data);
2085-
struct Deleter {
2086-
// napi_threadsafe_function is managed by Node.js, leave it alone.
2087-
void operator()(napi_threadsafe_function*) const {};
2088-
};
20892084

2090-
std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
2091-
Deleter _d;
2085+
napi_threadsafe_function _tsfn;
20922086
};
20932087

20942088
template<class T>

test/binding.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Object InitObjectDeprecated(Env env);
4141
Object InitPromise(Env env);
4242
#if (NAPI_VERSION > 3)
4343
Object InitThreadSafeFunctionPtr(Env env);
44+
Object InitThreadSafeFunctionSum(Env env);
4445
Object InitThreadSafeFunctionUnref(Env env);
4546
Object InitThreadSafeFunction(Env env);
4647
#endif
@@ -90,6 +91,7 @@ Object Init(Env env, Object exports) {
9091
exports.Set("promise", InitPromise(env));
9192
#if (NAPI_VERSION > 3)
9293
exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env));
94+
exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env));
9395
exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env));
9496
exports.Set("threadsafe_function", InitThreadSafeFunction(env));
9597
#endif

test/binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
'object/set_property.cc',
3737
'promise.cc',
3838
'threadsafe_function/threadsafe_function_ptr.cc',
39+
'threadsafe_function/threadsafe_function_sum.cc',
3940
'threadsafe_function/threadsafe_function_unref.cc',
4041
'threadsafe_function/threadsafe_function.cc',
4142
'typedarray.cc',

test/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ let testModules = [
4040
'object/set_property',
4141
'promise',
4242
'threadsafe_function/threadsafe_function_ptr',
43+
'threadsafe_function/threadsafe_function_sum',
4344
'threadsafe_function/threadsafe_function_unref',
4445
'threadsafe_function/threadsafe_function',
4546
'typedarray',
@@ -71,6 +72,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
7172
(process.env.npm_config_NAPI_VERSION < 4)) {
7273
testModules.splice(testModules.indexOf('asyncprogressworker'), 1);
7374
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1);
75+
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1);
7476
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_unref'), 1);
7577
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1);
7678
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
#include "napi.h"
2+
#include <thread>
3+
#include <cstdlib>
4+
#include <condition_variable>
5+
#include <mutex>
6+
7+
#if (NAPI_VERSION > 3)
8+
9+
using namespace Napi;
10+
11+
namespace {
12+
13+
struct TestData {
14+
15+
TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)) {};
16+
17+
// Native Promise returned to JavaScript
18+
Promise::Deferred deferred;
19+
20+
// List of threads created for test. This list only ever accessed via main
21+
// thread.
22+
std::vector<std::thread> threads = {};
23+
24+
ThreadSafeFunction tsfn = ThreadSafeFunction();
25+
};
26+
27+
void FinalizerCallback(Napi::Env env, TestData* finalizeData){
28+
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
29+
finalizeData->threads[i].join();
30+
}
31+
finalizeData->deferred.Resolve(Boolean::New(env,true));
32+
delete finalizeData;
33+
}
34+
35+
/**
36+
* See threadsafe_function_sum.js for descriptions of the tests in this file
37+
*/
38+
39+
void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
40+
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
41+
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
42+
callback.Call( { Number::New(env, static_cast<double>(threadId))});
43+
});
44+
tsfn.Release();
45+
}
46+
47+
static Value TestWithTSFN(const CallbackInfo& info) {
48+
int threadCount = info[0].As<Number>().Int32Value();
49+
Function cb = info[1].As<Function>();
50+
51+
// We pass the test data to the Finalizer for cleanup. The finalizer is
52+
// responsible for deleting this data as well.
53+
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));
54+
55+
ThreadSafeFunction tsfn = ThreadSafeFunction::New(
56+
info.Env(), cb, "Test", 0, threadCount,
57+
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);
58+
59+
for (int i = 0; i < threadCount; ++i) {
60+
// A copy of the ThreadSafeFunction will go to the thread entry point
61+
testData->threads.push_back( std::thread(entryWithTSFN, tsfn, i) );
62+
}
63+
64+
return testData->deferred.Promise();
65+
}
66+
67+
// Task instance created for each new std::thread
68+
class DelayedTSFNTask {
69+
public:
70+
// Each instance has its own tsfn
71+
ThreadSafeFunction tsfn;
72+
73+
// Thread-safety
74+
std::mutex mtx;
75+
std::condition_variable cv;
76+
77+
// Entry point for std::thread
78+
void entryDelayedTSFN(int threadId) {
79+
std::unique_lock<std::mutex> lk(mtx);
80+
cv.wait(lk);
81+
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
82+
callback.Call({Number::New(env, static_cast<double>(threadId))});
83+
});
84+
tsfn.Release();
85+
};
86+
};
87+
88+
struct TestDataDelayed {
89+
90+
TestDataDelayed(Promise::Deferred &&deferred)
91+
: deferred(std::move(deferred)){};
92+
~TestDataDelayed() { taskInsts.clear(); };
93+
// Native Promise returned to JavaScript
94+
Promise::Deferred deferred;
95+
96+
// List of threads created for test. This list only ever accessed via main
97+
// thread.
98+
std::vector<std::thread> threads = {};
99+
100+
// List of DelayedTSFNThread instances
101+
std::vector<std::unique_ptr<DelayedTSFNTask>> taskInsts = {};
102+
103+
ThreadSafeFunction tsfn = ThreadSafeFunction();
104+
};
105+
106+
void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed *finalizeData) {
107+
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
108+
finalizeData->threads[i].join();
109+
}
110+
finalizeData->deferred.Resolve(Boolean::New(env, true));
111+
delete finalizeData;
112+
}
113+
114+
static Value TestDelayedTSFN(const CallbackInfo &info) {
115+
int threadCount = info[0].As<Number>().Int32Value();
116+
Function cb = info[1].As<Function>();
117+
118+
TestDataDelayed *testData =
119+
new TestDataDelayed(Promise::Deferred::New(info.Env()));
120+
121+
testData->tsfn =
122+
ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount,
123+
std::function<decltype(FinalizerCallbackDelayed)>(
124+
FinalizerCallbackDelayed),
125+
testData);
126+
127+
for (int i = 0; i < threadCount; ++i) {
128+
testData->taskInsts.push_back(
129+
std::unique_ptr<DelayedTSFNTask>(new DelayedTSFNTask()));
130+
testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN,
131+
testData->taskInsts.back().get(),
132+
i));
133+
}
134+
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
135+
136+
for (auto &task : testData->taskInsts) {
137+
std::lock_guard<std::mutex> lk(task->mtx);
138+
task->tsfn = testData->tsfn;
139+
task->cv.notify_all();
140+
}
141+
142+
return testData->deferred.Promise();
143+
}
144+
145+
void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
146+
tsfn.Acquire();
147+
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
148+
tsfn.BlockingCall( [=](Napi::Env env, Function callback) {
149+
callback.Call( { Number::New(env, static_cast<double>(threadId))});
150+
});
151+
tsfn.Release();
152+
}
153+
154+
static Value CreateThread(const CallbackInfo& info) {
155+
TestData* testData = static_cast<TestData*>(info.Data());
156+
ThreadSafeFunction tsfn = testData->tsfn;
157+
int threadId = testData->threads.size();
158+
// A copy of the ThreadSafeFunction will go to the thread entry point
159+
testData->threads.push_back( std::thread(entryAcquire, tsfn, threadId) );
160+
return Number::New(info.Env(), threadId);
161+
}
162+
163+
static Value StopThreads(const CallbackInfo& info) {
164+
TestData* testData = static_cast<TestData*>(info.Data());
165+
ThreadSafeFunction tsfn = testData->tsfn;
166+
tsfn.Release();
167+
return info.Env().Undefined();
168+
}
169+
170+
static Value TestAcquire(const CallbackInfo& info) {
171+
Function cb = info[0].As<Function>();
172+
Napi::Env env = info.Env();
173+
174+
// We pass the test data to the Finalizer for cleanup. The finalizer is
175+
// responsible for deleting this data as well.
176+
TestData *testData = new TestData(Promise::Deferred::New(info.Env()));
177+
178+
testData->tsfn = ThreadSafeFunction::New(
179+
env, cb, "Test", 0, 1,
180+
std::function<decltype(FinalizerCallback)>(FinalizerCallback), testData);
181+
182+
Object result = Object::New(env);
183+
result["createThread"] = Function::New( env, CreateThread, "createThread", testData);
184+
result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData);
185+
result["promise"] = testData->deferred.Promise();
186+
187+
return result;
188+
}
189+
}
190+
191+
Object InitThreadSafeFunctionSum(Env env) {
192+
Object exports = Object::New(env);
193+
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
194+
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
195+
exports["testAcquire"] = Function::New(env, TestAcquire);
196+
return exports;
197+
}
198+
199+
#endif

0 commit comments

Comments
 (0)