Skip to content

Commit 86b364d

Browse files
committed
Remove Status from parallel_functions.
1 parent 41574c8 commit 86b364d

File tree

1 file changed

+38
-115
lines changed

1 file changed

+38
-115
lines changed

tiledb/sm/misc/parallel_functions.h

Lines changed: 38 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,20 @@ void parallel_sort(
9191

9292
// Define a work routine that encapsulates steps #1 - #3 in the
9393
// algorithm.
94-
std::function<Status(uint64_t, IterT, IterT)> quick_sort;
95-
quick_sort = [&](const uint64_t depth, IterT begin, IterT end) -> Status {
94+
std::function<void(uint64_t, IterT, IterT)> quick_sort;
95+
quick_sort = [&](const uint64_t depth, IterT begin, IterT end) {
9696
const size_t elements = std::distance(begin, end);
9797

9898
// Stop the recursion if this subrange does not contain
9999
// any elements to sort.
100100
if (elements <= 1) {
101-
return Status::Ok();
101+
return;
102102
}
103103

104104
// If there are only two elements remaining, directly sort them.
105105
if (elements <= 2) {
106106
std::sort(begin, end, cmp);
107-
return Status::Ok();
107+
return;
108108
}
109109

110110
// If we have reached the target height of the call stack tree,
@@ -114,7 +114,7 @@ void parallel_sort(
114114
// evenly distributed among them.
115115
if (depth + 1 == height) {
116116
std::sort(begin, end, cmp);
117-
return Status::Ok();
117+
return;
118118
}
119119

120120
// Step #1: Pick a pivot value in the range.
@@ -136,63 +136,43 @@ void parallel_sort(
136136
// Step #3: Recursively sort the left and right partitions.
137137
std::vector<ThreadPool::Task> tasks;
138138
if (begin != middle) {
139-
std::function<Status()> quick_sort_left =
140-
std::bind(quick_sort, depth + 1, begin, middle);
141-
ThreadPool::Task left_task = tp->execute(std::move(quick_sort_left));
142-
tasks.emplace_back(std::move(left_task));
139+
tasks.emplace_back(tp->execute(quick_sort, depth + 1, begin, middle));
143140
}
144141
if (middle != end) {
145-
std::function<Status()> quick_sort_right =
146-
std::bind(quick_sort, depth + 1, middle + 1, end);
147-
ThreadPool::Task right_task = tp->execute(std::move(quick_sort_right));
148-
tasks.emplace_back(std::move(right_task));
142+
tasks.emplace_back(tp->execute(quick_sort, depth + 1, middle + 1, end));
149143
}
150144

151145
// Wait for the sorted partitions.
152-
return tp->wait_all(tasks);
146+
tp->wait_all(tasks);
153147
};
154148

155149
// Start the quicksort from the entire range.
156-
throw_if_not_ok(quick_sort(0, begin, end));
150+
quick_sort(0, begin, end);
157151
}
158152

159153
/**
160154
* Call the given function on each element in the given iterator range.
161155
*
162156
* @tparam IterT Iterator type
163-
* @tparam FuncT Function type (returning Status).
157+
* @tparam FuncT Function type.
164158
* @param tp The threadpool to use.
165159
* @param begin Beginning of range (inclusive).
166160
* @param end End of range (exclusive).
167161
* @param F Function to call on each item
168-
* @return Status
169162
*/
170163
template <typename FuncT>
171-
Status parallel_for(
172-
ThreadPool* const tp, uint64_t begin, uint64_t end, const FuncT& F) {
164+
void parallel_for(
165+
ThreadPool* const tp, uint64_t begin, uint64_t end, const FuncT& F)
166+
requires std::same_as<std::invoke_result_t<FuncT, uint64_t>, void>
167+
{
173168
assert(begin <= end);
174169

175170
const uint64_t range_len = end - begin;
176171
if (range_len == 0)
177-
return Status::Ok();
172+
return;
178173

179174
assert(tp);
180175

181-
/*
182-
* Mutex protects atomicity of `failed*` local variables together. The first
183-
* subrange to fail determines the return or exception value.
184-
*/
185-
std::mutex failed_mutex;
186-
/*
187-
* If we were checking this variable inside either the main loop or the one in
188-
* `execute_subrange`, then it would be better to use `atomic_bool` to lessen
189-
* the lock overhead on the mutex. As it is, we do not prematurely stop any
190-
* loop that is not the first to fail.
191-
*/
192-
bool failed = false;
193-
optional<std::exception_ptr> failed_exception{nullopt};
194-
optional<Status> failed_status{nullopt};
195-
196176
/*
197177
* Executes subrange [subrange_start, subrange_end) that exists within the
198178
* range [begin, end).
@@ -203,43 +183,12 @@ Status parallel_for(
203183
* returns not OK, then this function returns that value. If a first function
204184
* to fail throws, then this function throws that value.
205185
*/
206-
std::function<Status(uint64_t, uint64_t)> execute_subrange =
207-
[&failed, &failed_exception, &failed_status, &failed_mutex, &F](
208-
const uint64_t subrange_start,
209-
const uint64_t subrange_end) -> Status {
210-
for (uint64_t i = subrange_start; i < subrange_end; ++i) {
211-
Status st;
212-
try {
213-
st = F(i);
214-
if (st.ok()) {
215-
continue;
216-
}
217-
std::lock_guard<std::mutex> lock(failed_mutex);
218-
if (!failed) {
219-
failed_status = st;
220-
failed = true;
221-
return st;
186+
auto execute_subrange =
187+
[&F](const uint64_t subrange_start, const uint64_t subrange_end) {
188+
for (uint64_t i = subrange_start; i < subrange_end; ++i) {
189+
F(i);
222190
}
223-
} catch (...) {
224-
std::lock_guard<std::mutex> lock(failed_mutex);
225-
if (!failed) {
226-
auto ce{std::current_exception()};
227-
failed_exception = ce;
228-
failed = true;
229-
std::rethrow_exception(ce);
230-
}
231-
}
232-
/*
233-
* If we reach this line, then either the status was not OK or `F` threw.
234-
* Now you'd think that we'd do something other than continue the loop in
235-
* this case, like `break` and end the function. Nope. That's not the
236-
* legacy behavior of this function. Nor is checking failure status in the
237-
* loop that kicks off this function. Regardless, we are leaving the
238-
* behavior exactly the same for now.
239-
*/
240-
}
241-
return Status{};
242-
};
191+
};
243192

244193
// Calculate the length of the subrange that each thread will
245194
// be responsible for.
@@ -261,48 +210,38 @@ Status parallel_for(
261210

262211
const uint64_t subrange_start = begin + fn_iter;
263212
const uint64_t subrange_end = begin + fn_iter + task_subrange_len;
264-
std::function<Status()> bound_fn =
265-
std::bind(execute_subrange, subrange_start, subrange_end);
266-
tasks.emplace_back(tp->execute(std::move(bound_fn)));
213+
tasks.emplace_back(
214+
tp->execute(execute_subrange, subrange_start, subrange_end));
267215

268216
fn_iter += task_subrange_len;
269217
}
270218

271219
// Wait for all instances of `execute_subrange` to complete.
272-
// This is ignoring the wait status as we use failed_exception for propagating
273-
// the tasks exceptions.
274-
(void)tp->wait_all(tasks);
275-
276-
if (failed_exception.has_value()) {
277-
std::rethrow_exception(failed_exception.value());
278-
}
279-
if (failed_status.has_value()) {
280-
return failed_status.value();
281-
}
282-
return Status{}; // otherwise return OK
220+
tp->wait_all(tasks);
283221
}
284222

285223
/**
286224
* Call the given function on every pair (i, j) in the given i and j ranges,
287225
* possibly in parallel.
288226
*
289-
* @tparam FuncT Function type (returning Status).
227+
* @tparam FuncT Function type.
290228
* @param tp The threadpool to use.
291229
* @param i0 Inclusive start of outer (rows) range.
292230
* @param i1 Exclusive end of outer range.
293231
* @param j0 Inclusive start of inner (cols) range.
294232
* @param j1 Exclusive end of inner range.
295233
* @param F Function to call on each (i, j) pair.
296-
* @return Status
297234
*/
298235
template <typename FuncT>
299-
Status parallel_for_2d(
236+
void parallel_for_2d(
300237
ThreadPool* const tp,
301238
uint64_t i0,
302239
uint64_t i1,
303240
uint64_t j0,
304241
uint64_t j1,
305-
const FuncT& F) {
242+
const FuncT& F)
243+
requires std::same_as<std::invoke_result_t<FuncT, uint64_t, uint64_t>, void>
244+
{
306245
assert(i0 <= i1);
307246
assert(j0 <= j1);
308247

@@ -312,11 +251,7 @@ Status parallel_for_2d(
312251
const uint64_t range_len_j = j1 - j0;
313252

314253
if (range_len_i == 0 || range_len_j == 0)
315-
return Status::Ok();
316-
317-
bool failed = false;
318-
Status return_st = Status::Ok();
319-
std::mutex return_st_mutex;
254+
return;
320255

321256
// Calculate the length of the subrange-i and subrange-j that
322257
// each thread will be responsible for.
@@ -328,24 +263,16 @@ Status parallel_for_2d(
328263

329264
// Executes subarray [begin_i, end_i) x [start_j, end_j) within the
330265
// array [i0, i1) x [j0, j1).
331-
std::function<Status(uint64_t, uint64_t, uint64_t, uint64_t)>
332-
execute_subrange_ij = [&failed, &return_st, &return_st_mutex, &F](
333-
const uint64_t begin_i,
334-
const uint64_t end_i,
335-
const uint64_t start_j,
336-
const uint64_t end_j) -> Status {
266+
auto execute_subrange_ij = [&F](
267+
const uint64_t begin_i,
268+
const uint64_t end_i,
269+
const uint64_t start_j,
270+
const uint64_t end_j) {
337271
for (uint64_t i = begin_i; i < end_i; ++i) {
338272
for (uint64_t j = start_j; j < end_j; ++j) {
339-
const Status st = F(i, j);
340-
if (!st.ok() && !failed) {
341-
failed = true;
342-
std::lock_guard<std::mutex> lock(return_st_mutex);
343-
return_st = st;
344-
}
273+
F(i, j);
345274
}
346275
}
347-
348-
return Status::Ok();
349276
};
350277

351278
// Calculate the subranges for each dimension, i and j.
@@ -383,21 +310,17 @@ Status parallel_for_2d(
383310
tasks.reserve(concurrency_level * concurrency_level);
384311
for (const auto& subrange_i : subranges_i) {
385312
for (const auto& subrange_j : subranges_j) {
386-
std::function<Status()> bound_fn = std::bind(
313+
tasks.emplace_back(tp->execute(
387314
execute_subrange_ij,
388315
subrange_i.first,
389316
subrange_i.second,
390317
subrange_j.first,
391-
subrange_j.second);
392-
tasks.emplace_back(tp->execute(std::move(bound_fn)));
318+
subrange_j.second));
393319
}
394320
}
395321

396322
// Wait for all instances of `execute_subrange` to complete.
397-
auto wait_status = tp->wait_all(tasks);
398-
if (!wait_status.ok())
399-
return wait_status;
400-
return return_st;
323+
tp->wait_all(tasks);
401324
}
402325

403326
} // namespace tiledb::sm

0 commit comments

Comments
 (0)