Skip to content

Commit c41eb18

Browse files
committed
Use FinalWaitTask in FWCore/Concurrency unit tests
The FinalWaitTask is more robust than doing a while loop over the tbb_group::wait call. Also added SAFE_REQUIRE to protect the call to REQUIRE in more places.
1 parent e152e7e commit c41eb18

File tree

3 files changed

+145
-181
lines changed

3 files changed

+145
-181
lines changed

FWCore/Concurrency/test/test2_catch2_limitedtaskqueue.cc

Lines changed: 59 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,49 @@
1111
#include <memory>
1212
#include <atomic>
1313
#include <thread>
14+
#include <mutex>
1415
#include "oneapi/tbb/task_arena.h"
1516
#include "FWCore/Concurrency/interface/WaitingTask.h"
17+
#include "FWCore/Concurrency/interface/FinalWaitingTask.h"
18+
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
1619
#include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
1720
#include "FWCore/Concurrency/interface/FunctorTask.h"
1821

1922
using namespace std::chrono_literals;
2023

24+
namespace {
25+
std::mutex g_requiresMutex;
26+
27+
}
28+
//catch2 REQUIRE is not thread safe
29+
#define SAFE_REQUIRE(__var__) \
30+
{ \
31+
std::lock_guard g{g_requiresMutex}; \
32+
REQUIRE(__var__); \
33+
}
34+
2135
TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
2236
SECTION("push") {
2337
{
2438
std::atomic<unsigned int> count{0};
2539
edm::LimitedTaskQueue queue{1};
2640
{
27-
std::atomic<int> waitingTasks{3};
2841
oneapi::tbb::task_group group;
29-
queue.push(group, [&count, &waitingTasks] {
42+
edm::FinalWaitingTask lastTask(group);
43+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
44+
queue.push(group, [&count, waitingTask] {
3045
REQUIRE(count++ == 0u);
3146
std::this_thread::sleep_for(10us);
32-
--waitingTasks;
3347
});
34-
queue.push(group, [&count, &waitingTasks] {
48+
queue.push(group, [&count, waitingTask] {
3549
REQUIRE(count++ == 1u);
3650
std::this_thread::sleep_for(10us);
37-
--waitingTasks;
3851
});
39-
queue.push(group, [&count, &waitingTasks] {
52+
queue.push(group, [&count, lastTask = std::move(waitingTask)] {
4053
REQUIRE(count++ == 2u);
4154
std::this_thread::sleep_for(10us);
42-
--waitingTasks;
4355
});
44-
do {
45-
group.wait();
46-
} while (0 != waitingTasks.load());
56+
lastTask.wait();
4757
REQUIRE(count == 3u);
4858
}
4959
}
@@ -52,29 +62,25 @@ TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
5262
constexpr unsigned int kMax = 2;
5363
edm::LimitedTaskQueue queue{kMax};
5464
{
55-
std::atomic<int> waitingTasks{3};
5665
oneapi::tbb::task_group group;
57-
queue.push(group, [&count, &waitingTasks, kMax] {
58-
REQUIRE(count++ < kMax);
66+
edm::FinalWaitingTask lastTask(group);
67+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
68+
queue.push(group, [&count, waitingTask, kMax] {
69+
SAFE_REQUIRE(count++ < kMax);
5970
std::this_thread::sleep_for(10us);
6071
--count;
61-
--waitingTasks;
6272
});
63-
queue.push(group, [&count, &waitingTasks, kMax] {
64-
REQUIRE(count++ < kMax);
73+
queue.push(group, [&count, waitingTask, kMax] {
74+
SAFE_REQUIRE(count++ < kMax);
6575
std::this_thread::sleep_for(10us);
6676
--count;
67-
--waitingTasks;
6877
});
69-
queue.push(group, [&count, &waitingTasks, kMax] {
70-
REQUIRE(count++ < kMax);
78+
queue.push(group, [&count, lastTask = std::move(waitingTask), kMax] {
79+
SAFE_REQUIRE(count++ < kMax);
7180
std::this_thread::sleep_for(10us);
7281
--count;
73-
--waitingTasks;
7482
});
75-
do {
76-
group.wait();
77-
} while (0 != waitingTasks);
83+
lastTask.wait();
7884
REQUIRE(count == 0u);
7985
}
8086
}
@@ -85,48 +91,45 @@ TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
8591
edm::LimitedTaskQueue queue{1};
8692
{
8793
{
88-
std::atomic<int> waitingTasks{3};
8994
oneapi::tbb::task_group group;
95+
edm::FinalWaitingTask lastTask(group);
96+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
97+
9098
edm::LimitedTaskQueue::Resumer resumer;
9199
std::atomic<bool> resumerSet{false};
92100
std::exception_ptr e1;
93-
queue.pushAndPause(
94-
group, [&resumer, &resumerSet, &count, &waitingTasks, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
95-
resumer = std::move(iResumer);
96-
resumerSet = true;
97-
try {
98-
REQUIRE(++count == 1u);
99-
} catch (...) {
100-
e1 = std::current_exception();
101-
}
102-
--waitingTasks;
103-
});
101+
queue.pushAndPause(group,
102+
[&resumer, &resumerSet, &count, waitingTask, &e1](edm::LimitedTaskQueue::Resumer iResumer) {
103+
resumer = std::move(iResumer);
104+
resumerSet = true;
105+
try {
106+
SAFE_REQUIRE(++count == 1u);
107+
} catch (...) {
108+
e1 = std::current_exception();
109+
}
110+
});
104111
std::exception_ptr e2;
105-
queue.push(group, [&count, &waitingTasks, &e2] {
112+
queue.push(group, [&count, waitingTask, &e2] {
106113
try {
107-
REQUIRE(++count == 2u);
114+
SAFE_REQUIRE(++count == 2u);
108115
} catch (...) {
109116
e2 = std::current_exception();
110117
}
111-
--waitingTasks;
112118
});
113119
std::exception_ptr e3;
114-
queue.push(group, [&count, &waitingTasks, &e3] {
120+
queue.push(group, [&count, lastTask = std::move(waitingTask), &e3] {
115121
try {
116-
REQUIRE(++count == 3u);
122+
SAFE_REQUIRE(++count == 3u);
117123
} catch (...) {
118124
e3 = std::current_exception();
119125
}
120-
--waitingTasks;
121126
});
122127
std::this_thread::sleep_for(100us);
123128
REQUIRE(2u >= count);
124129
while (not resumerSet) {
125130
}
126-
REQUIRE(resumer.resume());
127-
do {
128-
group.wait();
129-
} while (0 != waitingTasks.load());
131+
SAFE_REQUIRE(resumer.resume());
132+
lastTask.wait();
130133
REQUIRE(count == 3u);
131134
if (e1) {
132135
std::rethrow_exception(e1);
@@ -147,54 +150,46 @@ TEST_CASE("LimitedTaskQueue", "[LimitedTaskQueue]") {
147150
edm::LimitedTaskQueue queue{kMax};
148151
unsigned int index = 100;
149152
const unsigned int nTasks = 1000;
150-
//catch2 REQUIRE is not thread safe
151-
std::mutex mutex;
152153
while (0 != --index) {
153-
std::atomic<int> waiting{1};
154+
edm::FinalWaitingTask lastTask(group);
155+
154156
std::atomic<unsigned int> count{0};
155157
std::atomic<unsigned int> nRunningTasks{0};
156158
std::atomic<bool> waitToStart{true};
157159
{
158-
group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
160+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
161+
162+
group.run([&queue, &waitToStart, &group, waitingTask, &count, &nRunningTasks, kMax] {
159163
while (waitToStart) {
160164
}
161165
for (unsigned int i = 0; i < nTasks; ++i) {
162-
++waiting;
163-
queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
164-
std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
166+
queue.push(group, [&count, waitingTask, &nRunningTasks, kMax] {
165167
auto nrt = nRunningTasks++;
166168
if (nrt >= kMax) {
167169
std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
168-
std::lock_guard lock{mutex};
169-
REQUIRE(nrt < kMax);
170+
SAFE_REQUIRE(nrt < kMax);
170171
}
171172
++count;
172173
--nRunningTasks;
173174
});
174175
}
175176
});
176-
group.run([&queue, &waitToStart, &group, &waiting, &count, &nRunningTasks, &mutex, kMax] {
177+
group.run([&queue, &waitToStart, &group, waitingTask, &count, &nRunningTasks, kMax] {
177178
waitToStart = false;
178179
for (unsigned int i = 0; i < nTasks; ++i) {
179-
++waiting;
180-
queue.push(group, [&count, &waiting, &nRunningTasks, &mutex, kMax] {
181-
std::shared_ptr<std::atomic<int>> guardAgain{&waiting, [](auto* v) { --(*v); }};
180+
queue.push(group, [&count, waitingTask, &nRunningTasks, kMax] {
182181
auto nrt = nRunningTasks++;
183182
if (nrt >= kMax) {
184183
std::cout << "ERROR " << nRunningTasks << " >= " << kMax << std::endl;
185-
std::lock_guard lock{mutex};
186-
REQUIRE(nrt < kMax);
184+
SAFE_REQUIRE(nrt < kMax);
187185
}
188186
++count;
189187
--nRunningTasks;
190188
});
191189
}
192-
--waiting;
193190
});
194191
}
195-
do {
196-
group.wait();
197-
} while (0 != waiting.load());
192+
lastTask.wait();
198193
REQUIRE(nRunningTasks == 0u);
199194
REQUIRE(2 * nTasks == count);
200195
}

FWCore/Concurrency/test/test2_catch2_serialtaskqueue.cc

Lines changed: 41 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <thread>
1616
#include "oneapi/tbb/task_arena.h"
1717
#include "FWCore/Concurrency/interface/WaitingTask.h"
18+
#include "FWCore/Concurrency/interface/FinalWaitingTask.h"
19+
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
1820
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
1921
#include "FWCore/Concurrency/interface/FunctorTask.h"
2022

@@ -25,26 +27,24 @@ TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
2527
std::atomic<unsigned int> count{0};
2628
edm::SerialTaskQueue queue;
2729
{
28-
std::atomic<unsigned int> waitingTasks{3};
2930
oneapi::tbb::task_group group;
30-
queue.push(group, [&count, &waitingTasks] {
31-
REQUIRE(count++ == 0u);
32-
std::this_thread::sleep_for(10us);
33-
--waitingTasks;
34-
});
35-
queue.push(group, [&count, &waitingTasks] {
36-
REQUIRE(count++ == 1u);
37-
std::this_thread::sleep_for(10us);
38-
--waitingTasks;
39-
});
40-
queue.push(group, [&count, &waitingTasks] {
41-
REQUIRE(count++ == 2u);
42-
std::this_thread::sleep_for(10us);
43-
--waitingTasks;
44-
});
45-
do {
46-
group.wait();
47-
} while (0 != waitingTasks.load());
31+
edm::FinalWaitingTask lastTask(group);
32+
{
33+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
34+
queue.push(group, [&count, waitingTask] {
35+
REQUIRE(count++ == 0u);
36+
std::this_thread::sleep_for(10us);
37+
});
38+
queue.push(group, [&count, waitingTask] {
39+
REQUIRE(count++ == 1u);
40+
std::this_thread::sleep_for(10us);
41+
});
42+
queue.push(group, [&count, waitingTask] {
43+
REQUIRE(count++ == 2u);
44+
std::this_thread::sleep_for(10us);
45+
});
46+
}
47+
lastTask.wait();
4848
REQUIRE(count == 3u);
4949
}
5050
}
@@ -55,42 +55,32 @@ TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
5555
{
5656
queue.pause();
5757
{
58-
std::atomic<unsigned int> waitingTasks{1};
5958
oneapi::tbb::task_group group;
60-
queue.push(group, [&count, &waitingTasks] {
61-
REQUIRE(count++ == 0u);
62-
--waitingTasks;
63-
});
59+
edm::FinalWaitingTask lastTask(group);
60+
61+
queue.push(group, [&count, waitingTask = edm::WaitingTaskHolder(group, &lastTask)] { REQUIRE(count++ == 0u); });
6462
std::this_thread::sleep_for(1000us);
6563
REQUIRE(count == 0u);
6664
queue.resume();
67-
do {
68-
group.wait();
69-
} while (0 != waitingTasks.load());
65+
lastTask.wait();
7066
REQUIRE(count == 1u);
7167
}
7268
{
73-
std::atomic<unsigned int> waitingTasks{3};
7469
oneapi::tbb::task_group group;
75-
queue.push(group, [&count, &queue, &waitingTasks] {
76-
queue.pause();
77-
REQUIRE(count++ == 1u);
78-
--waitingTasks;
79-
});
80-
queue.push(group, [&count, &waitingTasks] {
81-
REQUIRE(count++ == 2u);
82-
--waitingTasks;
83-
});
84-
queue.push(group, [&count, &waitingTasks] {
85-
REQUIRE(count++ == 3u);
86-
--waitingTasks;
87-
});
70+
edm::FinalWaitingTask lastTask(group);
71+
{
72+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
73+
queue.push(group, [&count, &queue, waitingTask] {
74+
queue.pause();
75+
REQUIRE(count++ == 1u);
76+
});
77+
queue.push(group, [&count, waitingTask] { REQUIRE(count++ == 2u); });
78+
queue.push(group, [&count, waitingTask] { REQUIRE(count++ == 3u); });
79+
}
8880
std::this_thread::sleep_for(100us);
8981
REQUIRE(2u >= count);
9082
queue.resume();
91-
do {
92-
group.wait();
93-
} while (0 != waitingTasks.load());
83+
lastTask.wait();
9484
REQUIRE(count == 4u);
9585
}
9686
}
@@ -102,37 +92,26 @@ TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
10292
unsigned int index = 100;
10393
const unsigned int nTasks = 1000;
10494
while (0 != --index) {
105-
std::atomic<unsigned int> waitingTasks{2};
95+
edm::FinalWaitingTask lastTask(group);
10696
std::atomic<unsigned int> count{0};
10797
std::atomic<bool> waitToStart{true};
10898
{
109-
group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
99+
edm::WaitingTaskHolder waitingTask(group, &lastTask);
100+
group.run([&queue, &waitToStart, waitingTask, &count, &group] {
110101
while (waitToStart.load()) {
111102
}
112103
for (unsigned int i = 0; i < nTasks; ++i) {
113-
++waitingTasks;
114-
queue.push(group, [&count, &waitingTasks] {
115-
++count;
116-
--waitingTasks;
117-
});
104+
queue.push(group, [&count, waitingTask] { ++count; });
118105
}
119-
--waitingTasks;
120106
});
121-
group.run([&queue, &waitToStart, &waitingTasks, &count, &group] {
107+
group.run([&queue, &waitToStart, waitingTask, &count, &group] {
122108
waitToStart = false;
123109
for (unsigned int i = 0; i < nTasks; ++i) {
124-
++waitingTasks;
125-
queue.push(group, [&count, &waitingTasks] {
126-
++count;
127-
--waitingTasks;
128-
});
110+
queue.push(group, [&count, waitingTask] { ++count; });
129111
}
130-
--waitingTasks;
131112
});
132113
}
133-
do {
134-
group.wait();
135-
} while (0 != waitingTasks.load());
114+
lastTask.wait();
136115
REQUIRE(2 * nTasks == count);
137116
}
138117
}

0 commit comments

Comments
 (0)