Skip to content

Commit b0e7817

Browse files
committed
basic multi-threading
1 parent a01c3c8 commit b0e7817

File tree

2 files changed

+252
-24
lines changed

2 files changed

+252
-24
lines changed
Lines changed: 248 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,64 @@
1-
#include <array>
2-
#include "napi.h"
31
#include "../util/util.h"
2+
#include "napi.h"
3+
#include <array>
4+
#include <future>
5+
#include <iostream>
6+
#include <thread>
7+
#include <vector>
8+
9+
static constexpr auto DEFAULT_THREAD_COUNT = 10U;
10+
static constexpr auto DEFAULT_CALL_COUNT = 2;
11+
12+
13+
static struct {
14+
bool logCall = true; // Uses JS console.log to output when the TSFN is
15+
// processing the NonBlockingCall().
16+
bool logThread = false; // Uses native std::cout to output when the thread's
17+
// NonBlockingCall() request has finished.
18+
} DefaultOptions; // Options from Start()
19+
20+
/**
21+
* @brief Macro used specifically to support the dual CI test / documentation
22+
* example setup. Exceptions are always thrown as JavaScript exceptions when
23+
* running in example mode.
24+
*
25+
*/
26+
#define TSFN_THROW(tsfnWrap, e, ...) \
27+
if (tsfnWrap->cppExceptions) { \
28+
do { \
29+
(e).ThrowAsJavaScriptException(); \
30+
return __VA_ARGS__; \
31+
} while (0); \
32+
} else { \
33+
NAPI_THROW(e, __VA_ARGS__); \
34+
}
435

536
#if (NAPI_VERSION > 3)
637

738
using namespace Napi;
839

940
namespace example {
1041

42+
class TSFNWrap;
43+
1144
// Context of the TSFN.
12-
using Context = Reference<Napi::Value>;
45+
using Context = TSFNWrap;
46+
47+
using CompletionHandler = std::function<void()>;
1348

1449
// Data passed (as pointer) to [Non]BlockingCall
15-
using DataType = Promise::Deferred;
50+
struct DataType {
51+
// Promise::Deferred;
52+
// CompletionHandler handler;
53+
std::future<int> deferred;
54+
};
1655

1756
// CallJs callback function
1857
static void CallJs(Napi::Env env, Napi::Function /*jsCallback*/,
1958
Context *context, DataType *data) {
2059
if (env != nullptr) {
2160
if (data != nullptr) {
22-
data->Resolve(context->Value());
61+
// data->Resolve(context->Value());
2362
}
2463
}
2564
if (data != nullptr) {
@@ -30,54 +69,241 @@ static void CallJs(Napi::Env env, Napi::Function /*jsCallback*/,
3069
// Full type of the ThreadSafeFunctionEx
3170
using TSFN = ThreadSafeFunctionEx<Context, DataType, CallJs>;
3271

33-
class TSFNWrap;
72+
struct FinalizerDataType {
73+
std::vector<std::thread> threads;
74+
std::unique_ptr<Promise::Deferred> deferred;
75+
// struct {
76+
// bool logCall = true; // Uses JS console.log to output when the TSFN is
77+
// // processing the NonBlockingCall().
78+
// bool logThread = false; // Uses native std::cout to output when the thread's
79+
// // NonBlockingCall() request has finished.
80+
// } options; // Options from Start()
81+
};
82+
83+
static void threadEntry(size_t threadId, TSFN tsfn, uint32_t callCount,
84+
bool logThread) {
85+
using namespace std::chrono_literals;
86+
for (auto i = 0U; i < callCount; ++i) {
87+
// auto callData = new DataType();
88+
// tsfn.NonBlockingCall(callData);
89+
// auto result = callData->deferred.get();
90+
// if (logThread) {
91+
// std::cout << "Thread " << threadId << " got result " << result << "\n";
92+
// }
93+
94+
// std::this_thread::sleep_for(50ms * threadId);
95+
}
96+
std::cout << "Thread " << threadId << "finished\n";
97+
tsfn.Release();
98+
}
99+
34100
using base = tsfnutil::TSFNWrapBase<TSFNWrap, Context, TSFN>;
35101

36102
// A JS-accessible wrap that holds the TSFN.
37103
class TSFNWrap : public base {
38104
public:
39105
TSFNWrap(const CallbackInfo &info) : base(info) {
106+
if (info.Length() > 0 && info[0].IsObject()) {
107+
auto arg0 = info[0].ToObject();
108+
if (arg0.Has("cppExceptions")) {
109+
auto cppExceptions = arg0.Get("cppExceptions");
110+
if (cppExceptions.IsBoolean()) {
111+
cppExceptions = cppExceptions.As<Boolean>();
112+
} else {
113+
// We explicitly use the addon's except/noexcept settings here, since
114+
// we don't have a valid setting.
115+
Napi::TypeError::New(Env(), "cppExceptions is not a boolean")
116+
.ThrowAsJavaScriptException();
117+
}
118+
}
119+
}
120+
}
121+
~TSFNWrap() {
122+
for (auto& thread : finalizerData->threads) {
123+
if (thread.joinable()) {
124+
thread.join();
125+
}
126+
}
127+
}
128+
129+
static std::array<ClassPropertyDescriptor<TSFNWrap>, 3> InstanceMethods() {
130+
return {InstanceMethod("call", &TSFNWrap::Call),
131+
InstanceMethod("start", &TSFNWrap::Start),
132+
InstanceMethod("release", &TSFNWrap::Release)};
133+
}
134+
135+
bool cppExceptions = false;
136+
std::shared_ptr<FinalizerDataType> finalizerData;
137+
138+
Napi::Value Start(const CallbackInfo &info) {
40139
Napi::Env env = info.Env();
41140

42-
Context *context = new Context(Persistent(info[0]));
141+
if (_tsfn) {
142+
TSFN_THROW(this, Napi::Error::New(Env(), "TSFN already exists."),
143+
Value());
144+
}
145+
146+
// Creates a list to hold how many times each thread should make a call.
147+
std::vector<int32_t> callCounts;
148+
149+
// The JS-provided callback to execute for each call (if provided)
150+
Function callback;
151+
152+
// std::unique_ptr<FinalizerDataType> finalizerData =
153+
// std::make_unique<FinalizerDataType>();
154+
155+
// finalizerData = std::shared_ptr<FinalizerDataType>(FinalizerDataType{ std::vector<std::thread>() , Promise::Deferred::New(env) });
156+
157+
finalizerData = std::make_shared<FinalizerDataType>();
158+
159+
160+
bool logThread = DefaultOptions.logThread;
161+
bool logCall = DefaultOptions.logCall;
162+
163+
if (info.Length() > 0 && info[0].IsObject()) {
164+
auto arg0 = info[0].ToObject();
165+
if (arg0.Has("threads")) {
166+
Napi::Value threads = arg0.Get("threads");
167+
if (threads.IsArray()) {
168+
Napi::Array threadsArray = threads.As<Napi::Array>();
169+
for (auto i = 0U; i < threadsArray.Length(); ++i) {
170+
Napi::Value elem = threadsArray.Get(i);
171+
if (elem.IsNumber()) {
172+
callCounts.push_back(elem.As<Number>().Int32Value());
173+
} else {
174+
TSFN_THROW(this, Napi::TypeError::New(Env(), "Invalid arguments"),
175+
Value());
176+
}
177+
}
178+
} else if (threads.IsNumber()) {
179+
auto threadCount = threads.As<Number>().Int32Value();
180+
for (auto i = 0; i < threadCount; ++i) {
181+
callCounts.push_back(DEFAULT_CALL_COUNT);
182+
}
183+
} else {
184+
TSFN_THROW(this, Napi::TypeError::New(Env(), "Invalid arguments"),
185+
Value());
186+
}
187+
}
188+
189+
if (arg0.Has("callback")) {
190+
auto cb = arg0.Get("callback");
191+
if (cb.IsFunction()) {
192+
callback = cb.As<Function>();
193+
} else {
194+
TSFN_THROW(this,
195+
Napi::TypeError::New(Env(), "Callback is not a function"),
196+
Value());
197+
}
198+
}
199+
200+
if (arg0.Has("logCall")) {
201+
auto logCallOption = arg0.Get("logCall");
202+
if (logCallOption.IsBoolean()) {
203+
logCall = logCallOption.As<Boolean>();
204+
} else {
205+
TSFN_THROW(this,
206+
Napi::TypeError::New(Env(), "logCall is not a boolean"),
207+
Value());
208+
}
209+
}
210+
211+
if (arg0.Has("logThread")) {
212+
auto logThreadOption = arg0.Get("logThread");
213+
if (logThreadOption.IsBoolean()) {
214+
logThread = logThreadOption.As<Boolean>();
215+
} else {
216+
TSFN_THROW(this,
217+
Napi::TypeError::New(Env(), "logThread is not a boolean"),
218+
Value());
219+
}
220+
}
221+
}
222+
223+
224+
// Apply default arguments
225+
if (callCounts.size() == 0) {
226+
for (auto i = 0U; i < DEFAULT_THREAD_COUNT; ++i) {
227+
callCounts.push_back(DEFAULT_CALL_COUNT);
228+
}
229+
}
230+
231+
const auto threadCount = callCounts.size();
232+
233+
auto *finalizerDataPtr = new std::shared_ptr<FinalizerDataType>(finalizerData);
43234

44235
_tsfn = TSFN::New(
45236
env, // napi_env env,
46237
TSFN::DefaultFunctionFactory(env), // const Function& callback,
47238
Value(), // const Object& resource,
48239
"Test", // ResourceString resourceName,
49240
0, // size_t maxQueueSize,
50-
1, // size_t initialThreadCount,
51-
context, // Context* context,
52-
base::Finalizer, // Finalizer finalizer
53-
&_deferred // FinalizerDataType data
241+
threadCount + 1, // size_t initialThreadCount, +1 for Node thread
242+
this, // Context* context,
243+
Finalizer, // Finalizer finalizer
244+
finalizerDataPtr // FinalizerDataType* data
54245
);
55-
}
56246

57-
static std::array<ClassPropertyDescriptor<TSFNWrap>, 3> InstanceMethods() {
58-
return {InstanceMethod("call", &TSFNWrap::Call),
59-
InstanceMethod("getContext", &TSFNWrap::GetContext),
60-
InstanceMethod("release", &TSFNWrap::Release)};
247+
for (auto threadId = 0U; threadId < threadCount; ++threadId) {
248+
finalizerData->threads.push_back(
249+
std::thread(threadEntry, threadId, _tsfn, callCounts[threadId],
250+
logThread));
251+
}
252+
253+
254+
return String::New(env, "started");
255+
};
256+
257+
// TSFN finalizer. Resolves the Promise returned by `Release()` above.
258+
static void Finalizer(Napi::Env env, std::shared_ptr<FinalizerDataType> *finalizeData,
259+
Context *ctx) {
260+
// for (auto thread : finalizeData->threads) {
261+
262+
for (auto &thread : (*finalizeData)->threads) {
263+
std::cout << "Finalizer joining thread\n";
264+
if (thread.joinable()) {
265+
thread.join();
266+
}
267+
}
268+
269+
delete finalizeData;
270+
271+
// }
272+
// if (deferred->get()) {
273+
// (*deferred)->Resolve(Boolean::New(env, true));
274+
// deferred->release();
275+
276+
// }
61277
}
62278

279+
Napi::Value Release(const CallbackInfo &info) {
280+
if (finalizerData->deferred) {
281+
return finalizerData->deferred->Promise();
282+
}
283+
// return finalizerData->deferred.Promise();
284+
auto env = info.Env();
285+
finalizerData->deferred.reset(new Promise::Deferred(Promise::Deferred::New(env)));
286+
_tsfn.Release();
287+
return finalizerData->deferred->Promise();
288+
};
289+
63290
Napi::Value Call(const CallbackInfo &info) {
64-
auto *callData = new DataType(info.Env());
65-
_tsfn.NonBlockingCall(callData);
66-
return callData->Promise();
291+
// auto *callData = new DataType(info.Env());
292+
// _tsfn.NonBlockingCall(callData);
293+
// return callData->Promise();
294+
return info.Env().Undefined();
67295
};
68296

69297
Napi::Value GetContext(const CallbackInfo &) {
70298
return _tsfn.GetContext()->Value();
71299
};
72300
};
73-
} // namespace context
74-
301+
} // namespace example
75302

76303
Object InitThreadSafeFunctionExExample(Env env) {
77304
auto exports(Object::New(env));
78305
example::TSFNWrap::Init(env, exports, "example");
79306
return exports;
80307
}
81308

82-
83309
#endif

test/threadsafe_function_ex/test/example.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ class ExampleTest extends TestRunner {
99

1010
async example({ TSFNWrap }) {
1111
const ctx = {};
12+
console.log("starting");
1213
const tsfn = new TSFNWrap(ctx);
13-
assert(ctx === await tsfn.call(), "getContextByCall context not equal");
14-
assert(ctx === tsfn.getContext(), "getContextFromTsfn context not equal");
14+
console.log("tsfn is", tsfn);
15+
console.log("start is", tsfn.start());
16+
console.log();
1517
return await tsfn.release();
1618
}
1719

0 commit comments

Comments
 (0)