Skip to content

Commit 0242ad9

Browse files
authored
P3481 R2 - Incorporate changes from the "bulk issues" paper (NVIDIA#1500)
Add execution policy parameter to `bulk`. As execution policies don't come by default with libc++, also added variants for them in stdexec. The functor passed to `bulk` needs to be copy-constructible. Store the execution policy inside bulk operation state (if needed) Add `bulk_chunked` and `bulk_unchunked` Add tests for `bulk_chunked` and `bulk_unchunked` lower of `bulk` into `bulk_chunked`. Customize `bulk_chunked` instead of `bulk` for static thread pool.
1 parent b0b18b8 commit 0242ad9

23 files changed

+1331
-175
lines changed

examples/nvexec/bulk.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ auto main() -> int {
4343

4444
auto snd = ex::transfer_when_all( //
4545
sch,
46-
fork | ex::bulk(4, bulk_fn(1)),
46+
fork | ex::bulk(ex::par, 4, bulk_fn(1)),
4747
fork | ex::then(then_fn(1)),
48-
fork | ex::bulk(4, bulk_fn(2)))
48+
fork | ex::bulk(ex::par, 4, bulk_fn(2)))
4949
| ex::then(then_fn(2));
5050

5151
stdexec::sync_wait(std::move(snd));

examples/nvexec/maxwell/snr.cuh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,8 +421,8 @@ auto maxwell_eqs_snr(
421421
computer,
422422
repeat_n(
423423
n_iterations,
424-
ex::bulk(accessor.cells, update_h(accessor))
425-
| ex::bulk(accessor.cells, update_e(time, dt, accessor))))
424+
ex::bulk(ex::par, accessor.cells, update_h(accessor))
425+
| ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor))))
426426
| ex::then(dump_vtk(write_results, accessor));
427427
}
428428

@@ -436,7 +436,8 @@ void run_snr(
436436
time_storage_t time{is_gpu_scheduler(computer)};
437437
fields_accessor accessor = grid.accessor();
438438

439-
auto init = ex::just() | exec::on(computer, ex::bulk(grid.cells, grid_initializer(dt, accessor)));
439+
auto init = ex::just()
440+
| exec::on(computer, ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor)));
440441
stdexec::sync_wait(init);
441442

442443
auto snd = maxwell_eqs_snr(dt, time.get(), write_vtk, n_iterations, accessor, computer);

examples/nvexec/maxwell_distributed.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,8 @@ namespace distributed {
8888
, begin(grid_begin)
8989
, end(grid_end)
9090
, own_cells(end - begin)
91-
, fields_(
92-
device_alloc<float>(
93-
static_cast<std::size_t>(own_cells + n * 2)
94-
* static_cast<int>(field_id::fields_count))) {
91+
, fields_(device_alloc<float>(
92+
static_cast<std::size_t>(own_cells + n * 2) * static_cast<int>(field_id::fields_count))) {
9593
}
9694

9795
[[nodiscard]]
@@ -426,7 +424,7 @@ auto main(int argc, char *argv[]) -> int {
426424

427425
stdexec::sync_wait(
428426
ex::schedule(gpu)
429-
| ex::bulk(accessor.own_cells(), distributed::grid_initializer(dt, accessor)));
427+
| ex::bulk(ex::par, accessor.own_cells(), distributed::grid_initializer(dt, accessor)));
430428

431429
const int prev_rank = rank == 0 ? size - 1 : rank - 1;
432430
const int next_rank = rank == (size - 1) ? 0 : rank + 1;
@@ -481,13 +479,13 @@ auto main(int argc, char *argv[]) -> int {
481479

482480
for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) {
483481
auto compute_h = ex::when_all(
484-
ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_h_update)),
485-
ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_h_update))
482+
ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_h_update)),
483+
ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_h_update))
486484
| ex::then(exchange_hx));
487485

488486
auto compute_e = ex::when_all(
489-
ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_e_update)),
490-
ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_e_update))
487+
ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_e_update)),
488+
ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_e_update))
491489
| ex::then(exchange_ez));
492490

493491
stdexec::sync_wait(std::move(compute_h));
@@ -497,14 +495,16 @@ auto main(int argc, char *argv[]) -> int {
497495
write();
498496
#else
499497
for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) {
500-
auto compute_h = ex::just()
501-
| exec::on(gpu, ex::bulk(accessor.own_cells(), distributed::update_h(accessor)))
502-
| ex::then(exchange_hx);
498+
auto compute_h =
499+
ex::just()
500+
| exec::on(gpu, ex::bulk(ex::par, accessor.own_cells(), distributed::update_h(accessor)))
501+
| ex::then(exchange_hx);
503502

504503
auto compute_e =
505504
ex::just()
506505
| exec::on(
507-
gpu, ex::bulk(accessor.own_cells(), distributed::update_e(time.get(), dt, accessor)))
506+
gpu,
507+
ex::bulk(ex::par, accessor.own_cells(), distributed::update_e(time.get(), dt, accessor)))
508508
| ex::then(exchange_ez);
509509

510510
stdexec::sync_wait(std::move(compute_h));

examples/nvexec/split.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ auto main() -> int {
3939
};
4040
};
4141

42-
auto fork = ex::schedule(sch) | ex::bulk(4, bulk_fn(0)) | ex::split();
42+
auto fork = ex::schedule(sch) | ex::bulk(ex::par, 4, bulk_fn(0)) | ex::split();
4343

4444
auto snd = ex::transfer_when_all(
4545
sch,
46-
fork | ex::bulk(4, bulk_fn(1)),
46+
fork | ex::bulk(ex::par, 4, bulk_fn(1)),
4747
fork | ex::then(then_fn(1)),
48-
fork | ex::bulk(4, bulk_fn(2)))
48+
fork | ex::bulk(ex::par, 4, bulk_fn(2)))
4949
| ex::then(then_fn(2));
5050

5151
stdexec::sync_wait(std::move(snd));

examples/server_theme/split_bulk.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ auto handle_multi_blur_request(const http_request& req) -> ex::sender auto {
155155
size_t img_count = imgs.size();
156156
// return a sender that bulk-processes the image in parallel
157157
return ex::just(std::move(imgs))
158-
| ex::bulk(img_count, [](size_t i, std::vector<image>& imgs) {
158+
| ex::bulk(ex::par, img_count, [](size_t i, std::vector<image>& imgs) {
159159
imgs[i] = apply_blur(imgs[i]);
160160
});
161161
})

include/exec/__detail/__system_context_default_impl.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ namespace exec::__system_context_default_impl {
185185

186186
using __bulk_schedule_operation_t = __operation<decltype(stdexec::bulk(
187187
stdexec::schedule(std::declval<__pool_scheduler_t>()),
188+
stdexec::par,
188189
std::declval<uint32_t>(),
189190
std::declval<__bulk_functor>()))>;
190191

@@ -205,8 +206,8 @@ namespace exec::__system_context_default_impl {
205206
std::span<std::byte> __storage,
206207
bulk_item_receiver& __r) noexcept override {
207208
try {
208-
auto __sndr =
209-
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{&__r});
209+
auto __sndr = stdexec::bulk(
210+
stdexec::schedule(__pool_scheduler_), stdexec::par, __size, __bulk_functor{&__r});
210211
auto __os =
211212
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
212213
__os->start();

include/exec/libdispatch_queue.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ namespace exec {
100100
struct transform_bulk {
101101
template <class Data, class Sender>
102102
auto operator()(stdexec::bulk_t, Data &&data, Sender &&sndr) {
103-
auto [shape, fun] = std::forward<Data>(data);
103+
auto [pol, shape, fun] = std::forward<Data>(data);
104+
// TODO: handle non-par execution policies
104105
return bulk_sender_t<Sender, decltype(shape), decltype(fun)>{
105106
queue_, std::forward<Sender>(sndr), shape, std::move(fun)};
106107
}

include/exec/static_thread_pool.hpp

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,14 @@ namespace exec {
175175
// TODO: code to reconstitute a static_thread_pool_ schedule sender
176176
};
177177

178-
template <class SenderId, std::integral Shape, class Fun>
178+
template <class SenderId, bool parallelize, std::integral Shape, class Fun>
179179
struct bulk_sender {
180180
using Sender = stdexec::__t<SenderId>;
181181
struct __t;
182182
};
183183

184-
template <sender Sender, std::integral Shape, class Fun>
185-
using bulk_sender_t = __t<bulk_sender<__id<__decay_t<Sender>>, Shape, Fun>>;
184+
template <sender Sender, bool parallelize, std::integral Shape, class Fun>
185+
using bulk_sender_t = __t<bulk_sender<__id<__decay_t<Sender>>, parallelize, Shape, Fun>>;
186186

187187
#if STDEXEC_MSVC()
188188
// MSVCBUG https://developercommunity.visualstudio.com/t/Alias-template-with-pack-expansion-in-no/10437850
@@ -195,11 +195,11 @@ namespace exec {
195195
#endif
196196

197197
template <class Fun, class Shape, class... Args>
198-
requires __callable<Fun, Shape, Args&...>
198+
requires __callable<Fun, Shape, Shape, Args&...>
199199
using bulk_non_throwing = //
200200
__mbool<
201201
// If function invocation doesn't throw
202-
__nothrow_callable<Fun, Shape, Args&...> &&
202+
__nothrow_callable<Fun, Shape, Shape, Args&...> &&
203203
// and emplacing a tuple doesn't throw
204204
#if STDEXEC_MSVC()
205205
__bulk_non_throwing<Args...>::__v
@@ -209,36 +209,45 @@ namespace exec {
209209
// there's no need to advertise completion with `exception_ptr`
210210
>;
211211

212-
template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
212+
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
213213
struct bulk_shared_state;
214214

215-
template <class CvrefSenderId, class ReceiverId, class Shape, class Fun, bool MayThrow>
215+
template <
216+
class CvrefSenderId,
217+
class ReceiverId,
218+
bool parallelize,
219+
class Shape,
220+
class Fun,
221+
bool MayThrow>
216222
struct bulk_receiver {
217223
using CvrefSender = __cvref_t<CvrefSenderId>;
218224
using Receiver = stdexec::__t<ReceiverId>;
219225
struct __t;
220226
};
221227

222-
template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
223-
using bulk_receiver_t =
224-
__t<bulk_receiver<__cvref_id<CvrefSender>, __id<Receiver>, Shape, Fun, MayThrow>>;
228+
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
229+
using bulk_receiver_t = __t<
230+
bulk_receiver<__cvref_id<CvrefSender>, __id<Receiver>, parallelize, Shape, Fun, MayThrow>>;
225231

226-
template <class CvrefSenderId, class ReceiverId, std::integral Shape, class Fun>
232+
template <class CvrefSenderId, class ReceiverId, bool parallelize, std::integral Shape, class Fun>
227233
struct bulk_op_state {
228234
using CvrefSender = stdexec::__cvref_t<CvrefSenderId>;
229235
using Receiver = stdexec::__t<ReceiverId>;
230236
struct __t;
231237
};
232238

233-
template <class Sender, class Receiver, std::integral Shape, class Fun>
234-
using bulk_op_state_t =
235-
__t<bulk_op_state<__id<__decay_t<Sender>>, __id<__decay_t<Receiver>>, Shape, Fun>>;
239+
template <class Sender, class Receiver, bool parallelize, std::integral Shape, class Fun>
240+
using bulk_op_state_t = __t<
241+
bulk_op_state<__id<__decay_t<Sender>>, __id<__decay_t<Receiver>>, parallelize, Shape, Fun>>;
236242

237243
struct transform_bulk {
238244
template <class Data, class Sender>
239-
auto operator()(bulk_t, Data&& data, Sender&& sndr) {
240-
auto [shape, fun] = static_cast<Data&&>(data);
241-
return bulk_sender_t<Sender, decltype(shape), decltype(fun)>{
245+
auto operator()(bulk_chunked_t, Data&& data, Sender&& sndr) {
246+
auto [pol, shape, fun] = static_cast<Data&&>(data);
247+
using policy_t = std::remove_cvref_t<decltype(pol.__get())>;
248+
constexpr bool parallelize = std::same_as<policy_t, parallel_policy>
249+
|| std::same_as<policy_t, parallel_unsequenced_policy>;
250+
return bulk_sender_t<Sender, parallelize, decltype(shape), decltype(fun)>{
242251
pool_, static_cast<Sender&&>(sndr), shape, std::move(fun)};
243252
}
244253

@@ -264,7 +273,7 @@ namespace exec {
264273
public:
265274
struct domain : stdexec::default_domain {
266275
// For eager customization
267-
template <sender_expr_for<bulk_t> Sender>
276+
template <sender_expr_for<bulk_chunked_t> Sender>
268277
auto transform_sender(Sender&& sndr) const noexcept {
269278
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
270279
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
@@ -278,8 +287,8 @@ namespace exec {
278287
}
279288
}
280289

281-
// transform the generic bulk sender into a parallel thread-pool bulk sender
282-
template <sender_expr_for<bulk_t> Sender, class Env>
290+
// transform the generic bulk_chunked sender into a parallel thread-pool bulk sender
291+
template <sender_expr_for<bulk_chunked_t> Sender, class Env>
283292
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
284293
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
285294
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
@@ -680,9 +689,8 @@ namespace exec {
680689

681690
for (std::uint32_t index = 0; index < threadCount; ++index) {
682691
threadStates_[index].emplace(this, index, params, numa_);
683-
threadIndexByNumaNode_.push_back(
684-
thread_index_by_numa_node{
685-
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
692+
threadIndexByNumaNode_.push_back(thread_index_by_numa_node{
693+
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
686694
}
687695

688696
// NOLINTNEXTLINE(modernize-use-ranges) we still support platforms without the std::ranges algorithms
@@ -1103,8 +1111,8 @@ namespace exec {
11031111

11041112
//////////////////////////////////////////////////////////////////////////////////////////////////
11051113
// What follows is the implementation for parallel bulk execution on static_thread_pool_.
1106-
template <class SenderId, std::integral Shape, class Fun>
1107-
struct static_thread_pool_::bulk_sender<SenderId, Shape, Fun>::__t {
1114+
template <class SenderId, bool parallelize, std::integral Shape, class Fun>
1115+
struct static_thread_pool_::bulk_sender<SenderId, parallelize, Shape, Fun>::__t {
11081116
using __id = bulk_sender;
11091117
using sender_concept = sender_t;
11101118

@@ -1135,7 +1143,8 @@ namespace exec {
11351143

11361144
template <class Self, class Receiver>
11371145
using bulk_op_state_t = //
1138-
stdexec::__t<bulk_op_state<__cvref_id<Self, Sender>, stdexec::__id<Receiver>, Shape, Fun>>;
1146+
stdexec::__t<
1147+
bulk_op_state<__cvref_id<Self, Sender>, stdexec::__id<Receiver>, parallelize, Shape, Fun>>;
11391148

11401149
template <__decays_to<__t> Self, receiver Receiver>
11411150
requires receiver_of<Receiver, __completions_t<Self, env_of_t<Receiver>>>
@@ -1166,7 +1175,7 @@ namespace exec {
11661175
};
11671176

11681177
//! The customized operation state for `stdexec::bulk` operations
1169-
template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
1178+
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
11701179
struct static_thread_pool_::bulk_shared_state {
11711180
//! The actual `bulk_task` holds a pointer to the shared state
11721181
//! and its `__execute` function reads from that shared state.
@@ -1184,9 +1193,7 @@ namespace exec {
11841193
// In the case that the shape is much larger than the total number of threads,
11851194
// then each call to computation will call the function many times.
11861195
auto [begin, end] = even_share(sh_state.shape_, tid, total_threads);
1187-
for (Shape i = begin; i < end; ++i) {
1188-
sh_state.fun_(i, args...);
1189-
}
1196+
sh_state.fun_(begin, end, args...);
11901197
};
11911198

11921199
auto completion = [&](auto&... args) {
@@ -1252,8 +1259,12 @@ namespace exec {
12521259
//! That is, we don't need an agent for each of the shape values.
12531260
[[nodiscard]]
12541261
auto num_agents_required() const -> std::uint32_t {
1255-
return static_cast<std::uint32_t>(
1256-
std::min(shape_, static_cast<Shape>(pool_.available_parallelism())));
1262+
if constexpr (parallelize) {
1263+
return static_cast<std::uint32_t>(
1264+
std::min(shape_, static_cast<Shape>(pool_.available_parallelism())));
1265+
} else {
1266+
return static_cast<std::uint32_t>(1);
1267+
}
12571268
}
12581269

12591270
template <class F>
@@ -1282,12 +1293,20 @@ namespace exec {
12821293
};
12831294

12841295
//! A customized receiver to allow parallel execution of `stdexec::bulk` operations:
1285-
template <class CvrefSenderId, class ReceiverId, class Shape, class Fun, bool MayThrow>
1286-
struct static_thread_pool_::bulk_receiver<CvrefSenderId, ReceiverId, Shape, Fun, MayThrow>::__t {
1296+
template <
1297+
class CvrefSenderId,
1298+
class ReceiverId,
1299+
bool parallelize,
1300+
class Shape,
1301+
class Fun,
1302+
bool MayThrow>
1303+
struct static_thread_pool_::
1304+
bulk_receiver<CvrefSenderId, ReceiverId, parallelize, Shape, Fun, MayThrow>::__t {
12871305
using __id = bulk_receiver;
12881306
using receiver_concept = receiver_t;
12891307

1290-
using shared_state = bulk_shared_state<CvrefSender, Receiver, Shape, Fun, MayThrow>;
1308+
using shared_state =
1309+
bulk_shared_state<CvrefSender, Receiver, parallelize, Shape, Fun, MayThrow>;
12911310

12921311
shared_state& shared_state_;
12931312

@@ -1337,8 +1356,9 @@ namespace exec {
13371356
}
13381357
};
13391358

1340-
template <class CvrefSenderId, class ReceiverId, std::integral Shape, class Fun>
1341-
struct static_thread_pool_::bulk_op_state<CvrefSenderId, ReceiverId, Shape, Fun>::__t {
1359+
template <class CvrefSenderId, class ReceiverId, bool parallelize, std::integral Shape, class Fun>
1360+
struct static_thread_pool_::bulk_op_state<CvrefSenderId, ReceiverId, parallelize, Shape, Fun>::
1361+
__t {
13421362
using __id = bulk_op_state;
13431363

13441364
static constexpr bool may_throw = //
@@ -1348,8 +1368,9 @@ namespace exec {
13481368
__mbind_front_q<bulk_non_throwing, Fun, Shape>,
13491369
__q<__mand>>>;
13501370

1351-
using bulk_rcvr = bulk_receiver_t<CvrefSender, Receiver, Shape, Fun, may_throw>;
1352-
using shared_state = bulk_shared_state<CvrefSender, Receiver, Shape, Fun, may_throw>;
1371+
using bulk_rcvr = bulk_receiver_t<CvrefSender, Receiver, parallelize, Shape, Fun, may_throw>;
1372+
using shared_state =
1373+
bulk_shared_state<CvrefSender, Receiver, parallelize, Shape, Fun, may_throw>;
13531374
using inner_op_state = connect_result_t<CvrefSender, bulk_rcvr>;
13541375

13551376
shared_state shared_state_;

include/exec/system_context.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,8 @@ namespace exec {
615615
struct __transform_parallel_bulk_sender {
616616
template <class _Data, class _Previous>
617617
auto operator()(stdexec::bulk_t, _Data&& __data, _Previous&& __previous) const noexcept {
618-
auto [__shape, __fn] = static_cast<_Data&&>(__data);
618+
auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data);
619+
// TODO: handle non-par execution policies
619620
return __parallel_bulk_sender<_Previous, decltype(__shape), decltype(__fn)>{
620621
__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)};
621622
}

include/nvexec/stream/bulk.cuh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ namespace nvexec::_strm {
393393
struct transform_sender_for<stdexec::bulk_t> {
394394
template <class Data, stream_completing_sender Sender>
395395
auto operator()(__ignore, Data data, Sender&& sndr) const {
396-
auto [shape, fun] = static_cast<Data&&>(data);
396+
auto [policy, shape, fun] = static_cast<Data&&>(data);
397397
using Shape = decltype(shape);
398398
using Fn = decltype(fun);
399399
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));

0 commit comments

Comments
 (0)