Skip to content

Commit c20685b

Browse files
committed
test: wip with example test
1 parent 44adeea commit c20685b

File tree

3 files changed

+86
-48
lines changed

3 files changed

+86
-48
lines changed

test/threadsafe_function_ex/test/example.cc

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,22 @@ class TSFNWrap;
4343
// Context of the TSFN.
4444
using Context = TSFNWrap;
4545

46-
// Data passed (as pointer) to [Non]BlockingCall
47-
using DataType = std::unique_ptr<std::promise<int>>;
46+
struct Data {
47+
// Data passed (as pointer) to [Non]BlockingCall
48+
std::promise<int> promise;
49+
uint32_t base;
50+
};
51+
using DataType = std::unique_ptr<Data>;
4852

4953
// CallJs callback function
5054
static void CallJs(Napi::Env env, Napi::Function /*jsCallback*/,
51-
Context *context, DataType *data) {
52-
if (data != nullptr) {
55+
Context *context, DataType *dataPtr) {
56+
if (dataPtr != nullptr) {
57+
auto &data = *dataPtr;
5358
if (env != nullptr) {
54-
(*data)->set_value(clock());
59+
data->promise.set_value(data->base * data->base);
5560
} else {
56-
(*data)->set_exception(std::make_exception_ptr(
61+
data->promise.set_exception(std::make_exception_ptr(
5762
std::runtime_error("TSFN has been finalized.")));
5863
}
5964
}
@@ -92,42 +97,54 @@ class TSFNWrap : public base {
9297
}
9398
}
9499

95-
struct FinalizerDataType {
100+
struct FinalizerData {
96101
std::vector<std::thread> threads;
97102
std::unique_ptr<Promise::Deferred> deferred;
98103
};
99104

100105
// The finalizer data is shared, because we want to join the threads if our
101106
// TSFNWrap object gets garbage-collected and there are still active threads.
102-
using SharedFinalizerDataType = std::shared_ptr<FinalizerDataType>;
107+
using FinalizerDataType = std::shared_ptr<FinalizerData>;
103108

109+
#define THREADLOG(X) if (context->logThread) {\
110+
std::cout << X;\
111+
}
104112
static void threadEntry(size_t threadId, TSFN tsfn, uint32_t callCount,
105-
bool logThread) {
113+
Context *context) {
106114
using namespace std::chrono_literals;
115+
116+
THREADLOG("Thread " << threadId << " starting...\n")
117+
107118
for (auto i = 0U; i < callCount; ++i) {
108-
auto promise = std::make_unique<std::promise<int>>();
109-
tsfn.NonBlockingCall(&promise);
110-
auto future = promise->get_future();
119+
auto data = std::make_unique<Data>();
120+
data->base = threadId + 1;
121+
THREADLOG("Thread " << threadId << " making call, base = " << data->base << "\n")
122+
123+
tsfn.NonBlockingCall(&data);
124+
auto future = data->promise.get_future();
111125
auto result = future.get();
112-
if (logThread) {
113-
std::cout << "Thread " << threadId << " got result " << result << "\n";
114-
}
115-
}
116-
if (logThread) {
117-
std::cout << "Thread " << threadId << "finished\n";
126+
context->callSucceeded(result);
127+
THREADLOG("Thread " << threadId << " got result: " << result << "\n")
118128
}
129+
130+
THREADLOG("Thread " << threadId << " finished.\n\n")
119131
tsfn.Release();
120132
}
133+
#undef THREADLOG
121134

122-
static std::array<ClassPropertyDescriptor<TSFNWrap>, 3> InstanceMethods() {
123-
return {InstanceMethod("call", &TSFNWrap::Call),
135+
static std::array<ClassPropertyDescriptor<TSFNWrap>, 4> InstanceMethods() {
136+
return {InstanceMethod("getContext", &TSFNWrap::GetContext),
124137
InstanceMethod("start", &TSFNWrap::Start),
138+
InstanceMethod("callCount", &TSFNWrap::CallCount),
125139
InstanceMethod("release", &TSFNWrap::Release)};
126140
}
127141

128142
bool cppExceptions = false;
129143
bool logThread;
130-
std::shared_ptr<FinalizerDataType> finalizerData;
144+
std::atomic_uint succeededCalls;
145+
std::atomic_int aggregate;
146+
147+
FinalizerDataType finalizerData;
131148

132149
Napi::Value Start(const CallbackInfo &info) {
133150
Napi::Env env = info.Env();
@@ -143,7 +160,7 @@ class TSFNWrap : public base {
143160
// The JS-provided callback to execute for each call (if provided)
144161
Function callback;
145162

146-
finalizerData = std::make_shared<FinalizerDataType>();
163+
finalizerData = std::make_shared<FinalizerData>();
147164

148165
logThread = DefaultOptions.logThread;
149166
bool logCall = DefaultOptions.logCall;
@@ -217,8 +234,10 @@ class TSFNWrap : public base {
217234

218235
const auto threadCount = callCounts.size();
219236

220-
auto *finalizerDataPtr = new SharedFinalizerDataType(finalizerData);
237+
auto *finalizerDataPtr = new FinalizerDataType(finalizerData);
221238

239+
succeededCalls = 0;
240+
aggregate = 0;
222241
_tsfn = TSFN::New(
223242
env, // napi_env env,
224243
TSFN::DefaultFunctionFactory(env), // const Function& callback,
@@ -232,16 +251,16 @@ class TSFNWrap : public base {
232251
);
233252

234253
for (auto threadId = 0U; threadId < threadCount; ++threadId) {
235-
finalizerData->threads.push_back(std::thread(
236-
threadEntry, threadId, _tsfn, callCounts[threadId], logThread));
254+
finalizerData->threads.push_back(std::thread(threadEntry, threadId, _tsfn,
255+
callCounts[threadId], this));
237256
}
238257

239258
return String::New(env, "started");
240259
};
241260

242261
// TSFN finalizer. Joins the threads and resolves the Promise returned by
243262
// `Release()` above.
244-
static void Finalizer(Napi::Env env, SharedFinalizerDataType *finalizeData,
263+
static void Finalizer(Napi::Env env, FinalizerDataType *finalizeData,
245264
Context *ctx) {
246265

247266
if (ctx->logThread) {
@@ -254,7 +273,7 @@ class TSFNWrap : public base {
254273
}
255274
ctx->clearTSFN();
256275
if (ctx->logThread) {
257-
std::cout << "Finished\n";
276+
std::cout << "Finished finalizing threads.\n";
258277
}
259278

260279
(*finalizeData)->deferred->Resolve(Boolean::New(env, true));
@@ -265,26 +284,33 @@ class TSFNWrap : public base {
265284
if (finalizerData->deferred) {
266285
return finalizerData->deferred->Promise();
267286
}
268-
// return finalizerData->deferred.Promise();
269-
auto env = info.Env();
270287
finalizerData->deferred.reset(
271-
new Promise::Deferred(Promise::Deferred::New(env)));
288+
new Promise::Deferred(Promise::Deferred::New(info.Env())));
272289
_tsfn.Release();
273290
return finalizerData->deferred->Promise();
274291
};
275292

276-
Napi::Value Call(const CallbackInfo &info) {
277-
// auto *callData = new DataType(info.Env());
278-
// _tsfn.NonBlockingCall(callData); return callData->Promise();
279-
return info.Env().Undefined();
293+
Napi::Value CallCount(const CallbackInfo &info) {
294+
Napi::Env env(info.Env());
295+
296+
auto results = Array::New(env, 2);
297+
results.Set("0", Number::New(env, succeededCalls));
298+
results.Set("1", Number::New(env, aggregate));
299+
return results;
280300
};
281301

282302
Napi::Value GetContext(const CallbackInfo &) {
283303
return _tsfn.GetContext()->Value();
284304
};
285305

286-
// This does not run on the node thread.
306+
// This method does not run on the Node thread.
287307
void clearTSFN() { _tsfn = TSFN(); }
308+
309+
// This method does not run on the Node thread.
310+
void callSucceeded(int result) {
311+
succeededCalls++;
312+
aggregate += result;
313+
}
288314
};
289315
} // namespace example
290316

test/threadsafe_function_ex/test/example.js

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,31 @@ const { TestRunner } = require('../util/TestRunner');
88
class ExampleTest extends TestRunner {
99

1010
async example({ TSFNWrap }) {
11-
const ctx = {};
12-
const tsfn = new TSFNWrap(ctx);
13-
const run = async (i) => {
14-
const result = tsfn.start({threads:[1]});
15-
await result;
16-
return await tsfn.release();
17-
};
18-
const results = [ await run(1) ];
19-
results.push( await run(2) );
20-
return results;
21-
// return await Promise.all( [ run(1), run(2) ] );
22-
// await run(2);
11+
const tsfn = new TSFNWrap();
12+
13+
const threads = [1]; //, 2, 2, 5, 12];
14+
const started = await tsfn.start({ threads, logThread: true });
15+
16+
/**
17+
* Calculate the expected results.
18+
*/
19+
const expected = threads.reduce((p, threadCallCount, threadId) => (
20+
++threadId,
21+
p[0] += threadCallCount,
22+
p[1] += threadCallCount * threadId ** 2,
23+
p
24+
), [0, 0]);
25+
26+
if (started) {
27+
const released = await tsfn.release();
28+
const [callCountActual, aggregateActual] = tsfn.callCount();
29+
const [callCountExpected, aggregateExpected] = expected;
30+
assert(callCountActual == callCountExpected, `The number of calls do not match: actual = ${callCountActual}, expected = ${callCountExpected}`);
31+
assert(aggregateActual == aggregateExpected, `The aggregate of calls do not match: actual = ${aggregateActual}, expected = ${aggregateExpected}`);
32+
return expected;
33+
} else {
34+
throw new Error('The TSFN failed to start');
35+
}
2336
}
2437

2538
}

test/threadsafe_function_ex/util/TestRunner.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class TestRunner {
5656
// Run tests in both except and noexcept
5757
await this.run(false);
5858
await this.run(true);
59-
console.log("ALL DONE");
6059
} catch (ex) {
6160
console.error(`Test failed!`, ex);
6261
process.exit(1);

0 commit comments

Comments
 (0)