9
9
static constexpr auto DEFAULT_THREAD_COUNT = 10U ;
10
10
static constexpr auto DEFAULT_CALL_COUNT = 2 ;
11
11
12
-
13
- static struct {
14
- bool logCall = true ; // Uses JS console.log to output when the TSFN is
15
- // processing the NonBlockingCall().
16
- bool logThread = false ; // Uses native std::cout to output when the thread's
17
- // NonBlockingCall() request has finished.
18
- } DefaultOptions; // Options from Start()
12
+ static struct {
13
+ bool logCall = true ; // Uses JS console.log to output when the TSFN is
14
+ // processing the NonBlockingCall().
15
+ bool logThread = false ; // Uses native std::cout to output when the thread's
16
+ // NonBlockingCall() request has finished.
17
+ } DefaultOptions; // Options from Start()
19
18
20
19
/* *
21
20
* @brief Macro used specifically to support the dual CI test / documentation
@@ -44,59 +43,26 @@ class TSFNWrap;
44
43
// Context of the TSFN.
45
44
using Context = TSFNWrap;
46
45
47
- using CompletionHandler = std::function<void ()>;
48
-
49
46
// Data passed (as pointer) to [Non]BlockingCall
50
- struct DataType {
51
- // Promise::Deferred;
52
- // CompletionHandler handler;
53
- std::future<int > deferred;
54
- };
47
+ using DataType = std::unique_ptr<std::promise<int >>;
55
48
56
49
// CallJs callback function
57
50
static void CallJs (Napi::Env env, Napi::Function /* jsCallback*/ ,
58
51
Context *context, DataType *data) {
59
- if (env != nullptr ) {
60
- if (data != nullptr ) {
61
- // data->Resolve(context->Value());
62
- }
63
- }
64
52
if (data != nullptr ) {
65
- delete data;
53
+ if (env != nullptr ) {
54
+ (*data)->set_value (clock ());
55
+ } else {
56
+ (*data)->set_exception (std::make_exception_ptr (
57
+ std::runtime_error (" TSFN has been finalized." )));
58
+ }
66
59
}
60
+ // We do NOT delete data as it is a unique_ptr held by the calling thread.
67
61
}
68
62
69
63
// Full type of the ThreadSafeFunctionEx
70
64
using TSFN = ThreadSafeFunctionEx<Context, DataType, CallJs>;
71
65
72
- struct FinalizerDataType {
73
- std::vector<std::thread> threads;
74
- std::unique_ptr<Promise::Deferred> deferred;
75
- // struct {
76
- // bool logCall = true; // Uses JS console.log to output when the TSFN is
77
- // // processing the NonBlockingCall().
78
- // bool logThread = false; // Uses native std::cout to output when the thread's
79
- // // NonBlockingCall() request has finished.
80
- // } options; // Options from Start()
81
- };
82
-
83
- static void threadEntry (size_t threadId, TSFN tsfn, uint32_t callCount,
84
- bool logThread) {
85
- using namespace std ::chrono_literals;
86
- for (auto i = 0U ; i < callCount; ++i) {
87
- // auto callData = new DataType();
88
- // tsfn.NonBlockingCall(callData);
89
- // auto result = callData->deferred.get();
90
- // if (logThread) {
91
- // std::cout << "Thread " << threadId << " got result " << result << "\n";
92
- // }
93
-
94
- // std::this_thread::sleep_for(50ms * threadId);
95
- }
96
- std::cout << " Thread " << threadId << " finished\n " ;
97
- tsfn.Release ();
98
- }
99
-
100
66
using base = tsfnutil::TSFNWrapBase<TSFNWrap, Context, TSFN>;
101
67
102
68
// A JS-accessible wrap that holds the TSFN.
@@ -119,20 +85,48 @@ class TSFNWrap : public base {
119
85
}
120
86
}
121
87
~TSFNWrap () {
122
- for (auto & thread : finalizerData->threads ) {
88
+ for (auto & thread : finalizerData->threads ) {
123
89
if (thread.joinable ()) {
124
90
thread.join ();
125
91
}
126
92
}
127
93
}
128
94
95
+ struct FinalizerDataType {
96
+ std::vector<std::thread> threads;
97
+ std::unique_ptr<Promise::Deferred> deferred;
98
+ };
99
+
100
+ // The finalizer data is shared, because we want to join the threads if our
101
+ // TSFNWrap object gets garbage-collected and there are still active threads.
102
+ using SharedFinalizerDataType = std::shared_ptr<FinalizerDataType>;
103
+
104
+ static void threadEntry (size_t threadId, TSFN tsfn, uint32_t callCount,
105
+ bool logThread) {
106
+ using namespace std ::chrono_literals;
107
+ for (auto i = 0U ; i < callCount; ++i) {
108
+ auto promise = std::make_unique<std::promise<int >>();
109
+ tsfn.NonBlockingCall (&promise);
110
+ auto future = promise->get_future ();
111
+ auto result = future.get ();
112
+ if (logThread) {
113
+ std::cout << " Thread " << threadId << " got result " << result << " \n " ;
114
+ }
115
+ }
116
+ if (logThread) {
117
+ std::cout << " Thread " << threadId << " finished\n " ;
118
+ }
119
+ tsfn.Release ();
120
+ }
121
+
129
122
static std::array<ClassPropertyDescriptor<TSFNWrap>, 3 > InstanceMethods () {
130
123
return {InstanceMethod (" call" , &TSFNWrap::Call),
131
124
InstanceMethod (" start" , &TSFNWrap::Start),
132
125
InstanceMethod (" release" , &TSFNWrap::Release)};
133
126
}
134
127
135
128
bool cppExceptions = false ;
129
+ bool logThread;
136
130
std::shared_ptr<FinalizerDataType> finalizerData;
137
131
138
132
Napi::Value Start (const CallbackInfo &info) {
@@ -149,15 +143,9 @@ class TSFNWrap : public base {
149
143
// The JS-provided callback to execute for each call (if provided)
150
144
Function callback;
151
145
152
- // std::unique_ptr<FinalizerDataType> finalizerData =
153
- // std::make_unique<FinalizerDataType>();
154
-
155
- // finalizerData = std::shared_ptr<FinalizerDataType>(FinalizerDataType{ std::vector<std::thread>() , Promise::Deferred::New(env) });
156
-
157
146
finalizerData = std::make_shared<FinalizerDataType>();
158
147
159
-
160
- bool logThread = DefaultOptions.logThread ;
148
+ logThread = DefaultOptions.logThread ;
161
149
bool logCall = DefaultOptions.logCall ;
162
150
163
151
if (info.Length () > 0 && info[0 ].IsObject ()) {
@@ -220,7 +208,6 @@ class TSFNWrap : public base {
220
208
}
221
209
}
222
210
223
-
224
211
// Apply default arguments
225
212
if (callCounts.size () == 0 ) {
226
213
for (auto i = 0U ; i < DEFAULT_THREAD_COUNT; ++i) {
@@ -230,50 +217,48 @@ class TSFNWrap : public base {
230
217
231
218
const auto threadCount = callCounts.size ();
232
219
233
- auto *finalizerDataPtr = new std::shared_ptr<FinalizerDataType> (finalizerData);
220
+ auto *finalizerDataPtr = new SharedFinalizerDataType (finalizerData);
234
221
235
222
_tsfn = TSFN::New (
236
223
env, // napi_env env,
237
224
TSFN::DefaultFunctionFactory (env), // const Function& callback,
238
225
Value (), // const Object& resource,
239
226
" Test" , // ResourceString resourceName,
240
227
0 , // size_t maxQueueSize,
241
- threadCount + 1 , // size_t initialThreadCount, +1 for Node thread
242
- this , // Context* context,
243
- Finalizer, // Finalizer finalizer
228
+ threadCount + 1 , // size_t initialThreadCount, +1 for Node thread
229
+ this , // Context* context,
230
+ Finalizer, // Finalizer finalizer
244
231
finalizerDataPtr // FinalizerDataType* data
245
232
);
246
233
247
234
for (auto threadId = 0U ; threadId < threadCount; ++threadId) {
248
- finalizerData->threads .push_back (
249
- std::thread (threadEntry, threadId, _tsfn, callCounts[threadId],
250
- logThread));
235
+ finalizerData->threads .push_back (std::thread (
236
+ threadEntry, threadId, _tsfn, callCounts[threadId], logThread));
251
237
}
252
238
253
-
254
239
return String::New (env, " started" );
255
240
};
256
241
257
- // TSFN finalizer. Resolves the Promise returned by `Release()` above.
258
- static void Finalizer (Napi::Env env, std::shared_ptr<FinalizerDataType> *finalizeData,
242
+ // TSFN finalizer. Joins the threads and resolves the Promise returned by
243
+ // `Release()` above.
244
+ static void Finalizer (Napi::Env env, SharedFinalizerDataType *finalizeData,
259
245
Context *ctx) {
260
- // for (auto thread : finalizeData->threads) {
261
246
247
+ if (ctx->logThread ) {
248
+ std::cout << " Finalizer joining threads\n " ;
249
+ }
262
250
for (auto &thread : (*finalizeData)->threads ) {
263
- std::cout << " Finalizer joining thread\n " ;
264
251
if (thread.joinable ()) {
265
252
thread.join ();
266
253
}
267
254
}
255
+ ctx->clearTSFN ();
256
+ if (ctx->logThread ) {
257
+ std::cout << " Finished\n " ;
258
+ }
268
259
260
+ (*finalizeData)->deferred ->Resolve (Boolean::New (env, true ));
269
261
delete finalizeData;
270
-
271
- // }
272
- // if (deferred->get()) {
273
- // (*deferred)->Resolve(Boolean::New(env, true));
274
- // deferred->release();
275
-
276
- // }
277
262
}
278
263
279
264
Napi::Value Release (const CallbackInfo &info) {
@@ -282,21 +267,24 @@ class TSFNWrap : public base {
282
267
}
283
268
// return finalizerData->deferred.Promise();
284
269
auto env = info.Env ();
285
- finalizerData->deferred .reset (new Promise::Deferred (Promise::Deferred::New (env)));
270
+ finalizerData->deferred .reset (
271
+ new Promise::Deferred (Promise::Deferred::New (env)));
286
272
_tsfn.Release ();
287
273
return finalizerData->deferred ->Promise ();
288
274
};
289
275
290
276
Napi::Value Call (const CallbackInfo &info) {
291
277
// auto *callData = new DataType(info.Env());
292
- // _tsfn.NonBlockingCall(callData);
293
- // return callData->Promise();
278
+ // _tsfn.NonBlockingCall(callData); return callData->Promise();
294
279
return info.Env ().Undefined ();
295
280
};
296
281
297
282
Napi::Value GetContext (const CallbackInfo &) {
298
283
return _tsfn.GetContext ()->Value ();
299
284
};
285
+
286
+ // This does not run on the node thread.
287
+ void clearTSFN () { _tsfn = TSFN (); }
300
288
};
301
289
} // namespace example
302
290
0 commit comments